Skip to content

Commit

Permalink
*: support two nodes roll update radondb#121
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jul 13, 2021
1 parent 66f4b64 commit edaf531
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 39 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ type ClusterStatus struct {
Conditions []ClusterCondition `json:"conditions,omitempty"`
// Nodes contains the list of the node status fulfilled.
Nodes []NodeStatus `json:"nodes,omitempty"`
// IgnoreTackle ignore the tackle.
IgnoreTackle bool `json:"ignoreTackle,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
3 changes: 3 additions & 0 deletions charts/mysql-operator/crds/mysql.radondb.com_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,9 @@ spec:
- type
type: object
type: array
ignoreTackle:
description: IgnoreTackle ignore the tackle.
type: boolean
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
146 changes: 141 additions & 5 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package syncer

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/iancoleman/strcase"
Expand All @@ -39,6 +42,7 @@ import (

"github.com/radondb/radondb-mysql-kubernetes/cluster"
"github.com/radondb/radondb-mysql-kubernetes/cluster/container"
"github.com/radondb/radondb-mysql-kubernetes/internal"
"github.com/radondb/radondb-mysql-kubernetes/utils"
)

Expand Down Expand Up @@ -159,22 +163,31 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultNone, err
}

if err := s.updatePod(ctx, s.sfs); err != nil {
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error {
if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas {
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
}

log.Info("statefulSet was changed, run update")
defer func() {
if s.Status.IgnoreTackle {
s.Status.IgnoreTackle = false
}

if sErr := s.cli.Status().Update(ctx, s.Unwrap()); sErr != nil {
log.Error(sErr, "failed to update cluster status")
}
}()

if sfs.Status.ReadyReplicas < sfs.Status.Replicas {
if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}
Expand All @@ -183,13 +196,17 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS
if err := s.cli.List(ctx,
&pods,
&client.ListOptions{
Namespace: sfs.Namespace,
Namespace: s.sfs.Namespace,
LabelSelector: s.GetLabels().AsSelector(),
},
); err != nil {
return err
}

if err := s.preUpdate(ctx, pods); err != nil {
return err
}

// get the leader pod.
var leaderPod corev1.Pod
for _, pod := range pods.Items {
Expand Down Expand Up @@ -219,6 +236,100 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS
return nil
}

func (s *StatefulSetSyncer) preUpdate(ctx context.Context, pods corev1.PodList) error {
if s.sfs.Status.Replicas != 2 {
return nil
}

// get the leader pod.
var leaderPod, followerPod corev1.Pod
for _, pod := range pods.Items {
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

if pod.ObjectMeta.Labels["role"] == "leader" {
leaderPod = pod
} else if pod.ObjectMeta.Labels["role"] == "follower" {
followerPod = pod
}
}

if leaderPod.Name == "" || followerPod.Name == "" {
return fmt.Errorf("failed to get the leader or follower pod")
}

s.Status.IgnoreTackle = true
if err := s.cli.Status().Update(ctx, s.Unwrap()); err != nil {
return err
}

sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

secret := &corev1.Secret{}
if err := s.cli.Get(context.TODO(),
types.NamespacedName{
Namespace: nameSpace,
Name: sctName,
},
secret,
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}

user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}
rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

host := fmt.Sprintf("%s.%s.%s", leaderPod.Name, svcName, nameSpace)
runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leaderPod.Name)
return err
}
defer runner.Close()
if err = runner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leaderPod.Name)
return err
}

if err = retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
return runner.CheckProcesslist()
}); err != nil {
return err
}

if err = retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
if leaderPod.ObjectMeta.Labels["role"] != "leader" {
if _, err = xenonHttpRequest(host, "POST", "/v1/raft/trytoleader", rootPasswd, nil); err != nil {
return false, err
}
}

if _, err = xenonHttpRequest(host, "PUT", "/v1/raft/disablechecksemisync", rootPasswd, nil); err != nil {
return false, err
}

return true, nil
}); err != nil {
return err
}

return nil
}

// mutate set the statefulset.
func (s *StatefulSetSyncer) mutate() error {
s.sfs.Spec.ServiceName = s.GetNameForResource(utils.StatefulSet)
Expand Down Expand Up @@ -384,3 +495,28 @@ func basicEventReason(objKindName string, err error) string {

return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName))
}

func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (map[string]interface{}, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body)
if err != nil {
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

var out map[string]interface{}
if err = unmarshalJSON(resp.Body, &out); err != nil {
return nil, err
}
return out, nil
}
46 changes: 14 additions & 32 deletions cluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package syncer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/presslabs/controller-util/syncer"
Expand Down Expand Up @@ -207,9 +205,11 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
}

if isLeader == corev1.ConditionTrue && isReadOnly != corev1.ConditionFalse {
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
runner.RunQuery("SET GLOBAL read_only=off")
runner.RunQuery("SET GLOBAL super_read_only=off")
if !s.Status.IgnoreTackle {
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
runner.RunQuery("SET GLOBAL read_only=off")
runner.RunQuery("SET GLOBAL super_read_only=off")
}
}
}

Expand All @@ -224,7 +224,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
// update apiv1alpha1.NodeConditionReadOnly.
s.updateNodeCondition(node, 2, isReadOnly)

if err = setPodHealthy(ctx, cli, &pod, node); err != nil {
if err = s.setPodHealthy(ctx, &pod, node); err != nil {
log.Error(err, "cannot update pod", "name", podName, "namespace", pod.Namespace)
}
}
Expand Down Expand Up @@ -284,28 +284,9 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int

// checkRole used to check whether the mysql role is leader.
func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
status := corev1.ConditionUnknown

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/v1/raft/status", host, utils.XenonPeerPort), nil)
if err != nil {
return status, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
out, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil)
if err != nil {
return status, err
}

if resp.StatusCode != 200 {
return status, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

var out map[string]interface{}
if err = unmarshalJSON(resp.Body, &out); err != nil {
return status, err
return corev1.ConditionUnknown, err
}

if out["state"] == "LEADER" {
Expand All @@ -316,27 +297,28 @@ func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
return corev1.ConditionFalse, nil
}

return status, nil
return corev1.ConditionUnknown, nil
}

// setPodHealthy set the pod lable healthy.
func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
healthy := "no"
if node.Conditions[0].Status == corev1.ConditionFalse {
if node.Conditions[1].Status == corev1.ConditionFalse &&
node.Conditions[2].Status == corev1.ConditionTrue &&
node.Conditions[3].Status == corev1.ConditionTrue {
healthy = "yes"
} else if node.Conditions[1].Status == corev1.ConditionTrue &&
node.Conditions[2].Status == corev1.ConditionFalse &&
node.Conditions[3].Status == corev1.ConditionFalse {
node.Conditions[3].Status == corev1.ConditionFalse &&
(node.Conditions[2].Status == corev1.ConditionFalse ||
node.Conditions[2].Status == corev1.ConditionTrue && s.Status.IgnoreTackle) {
healthy = "yes"
}
}

if pod.Labels["healthy"] != healthy {
pod.Labels["healthy"] = healthy
if err := cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
if err := s.cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
return err
}
}
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/mysql.radondb.com_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,9 @@ spec:
- type
type: object
type: array
ignoreTackle:
description: IgnoreTackle ignore the tackle.
type: boolean
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
41 changes: 39 additions & 2 deletions internal/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ var (
"waiting to reconnect after a failed binlog dump request",
"waiting to reconnect after a failed master event read",
}

processStates = []string{
"Master has sent all binlog to slave; waiting for more updates",
}
)

// SQLRunner is a runner for run the sql.
Expand Down Expand Up @@ -168,9 +172,42 @@ func (s *SQLRunner) RunQuery(query string, args ...interface{}) error {
}

// GetGlobalVariable used to get the global variable by param.
func (sr *SQLRunner) GetGlobalVariable(param string, val interface{}) error {
func (s *SQLRunner) GetGlobalVariable(param string, val interface{}) error {
query := fmt.Sprintf("select @@global.%s", param)
return sr.db.QueryRow(query).Scan(val)
return s.db.QueryRow(query).Scan(val)
}

func (s *SQLRunner) CheckProcesslist() (bool, error) {
var rows *sql.Rows
rows, err := s.db.Query("show slave status;")
if err != nil {
return false, err
}

defer rows.Close()

var cols []string
cols, err = rows.Columns()
if err != nil {
return false, err
}

scanArgs := make([]interface{}, len(cols))
for i := range scanArgs {
scanArgs[i] = &sql.RawBytes{}
}

for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return false, err
}

state := columnValue(scanArgs, cols, "State")
if stringInArray(state, processStates) {
return true, nil
}
}
return false, nil
}

// Close closes the database and prevents new queries from starting.
Expand Down

0 comments on commit edaf531

Please sign in to comment.