diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index 9896e599d..c84350b1e 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -18,8 +18,11 @@ package syncer import ( "context" + "encoding/base64" "errors" "fmt" + "io" + "net/http" "time" "github.com/iancoleman/strcase" @@ -39,6 +42,7 @@ import ( "github.com/radondb/radondb-mysql-kubernetes/cluster" "github.com/radondb/radondb-mysql-kubernetes/cluster/container" + "github.com/radondb/radondb-mysql-kubernetes/internal" "github.com/radondb/radondb-mysql-kubernetes/utils" ) @@ -159,7 +163,7 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. return controllerutil.OperationResultNone, err } - if err := s.updatePod(ctx, s.sfs); err != nil { + if err := s.updatePod(ctx); err != nil { return controllerutil.OperationResultNone, err } @@ -167,14 +171,14 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. } // updatePod update the pods. -func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error { - if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas { +func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { + if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { return nil } log.Info("statefulSet was changed, run update") - if sfs.Status.ReadyReplicas < sfs.Status.Replicas { + if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas { log.Info("can't start/continue 'update': waiting for all replicas are ready") return nil } @@ -183,15 +187,15 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS if err := s.cli.List(ctx, &pods, &client.ListOptions{ - Namespace: sfs.Namespace, + Namespace: s.sfs.Namespace, LabelSelector: s.GetLabels().AsSelector(), }, ); err != nil { return err } - // get the leader pod. var leaderPod corev1.Pod + var followerPods []corev1.Pod for _, pod := range pods.Items { if pod.ObjectMeta.Labels["healthy"] != "yes" { return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) @@ -199,23 +203,112 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { leaderPod = pod - log.Info("get leader pod", "pod name", leaderPod.Name) continue } + followerPods = append(followerPods, pod) if err := s.applyNWait(ctx, &pod); err != nil { return err } } if leaderPod.Name != "" { - log.Info("apply changes to leader pod", "pod name", leaderPod.Name) + if len(followerPods) == 1 { + if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { + return err + } + } if err := s.applyNWait(ctx, &leaderPod); err != nil { return err } } - log.Info("update finished") + return nil +} + +func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { + if s.sfs.Status.Replicas != 2 { + return nil + } + + sctName := s.GetNameForResource(utils.Secret) + svcName := s.GetNameForResource(utils.HeadlessSVC) + port := utils.MysqlPort + nameSpace := s.Namespace + + // get secrets. + secret := &corev1.Secret{} + if err := s.cli.Get(context.TODO(), + types.NamespacedName{ + Namespace: nameSpace, + Name: sctName, + }, + secret, + ); err != nil { + return fmt.Errorf("failed to get the secret: %s", sctName) + } + user, ok := secret.Data["operator-user"] + if !ok { + return fmt.Errorf("failed to get the user: %s", user) + } + password, ok := secret.Data["operator-password"] + if !ok { + return fmt.Errorf("failed to get the password: %s", password) + } + rootPasswd, ok := secret.Data["root-password"] + if !ok { + return fmt.Errorf("failed to get the root password: %s", rootPasswd) + } + + leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace) + leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port) + if err != nil { + log.Error(err, "failed to connect the mysql", "node", leader) + return err + } + defer leaderRunner.Close() + // set leader read only. + if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil { + log.Error(err, "failed to set leader read only", "node", leader) + return err + } + // make sure the master has sent all binlog to slave. + if err = retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) { + success, err := leaderRunner.CheckProcesslist() + if err != nil { + return false, err + } + if success { + return true, nil + } + return false, nil + }); err != nil { + return err + } + + followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace) + if err = retry(time.Second*10, time.Second*60, func() (bool, error) { + // follower try to leader. + if _, err = xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil); err != nil { + log.Error(err, "failed to try to leader", "pod", follower) + return false, nil + } + time.Sleep(time.Second * 5) + + status, err := checkRole(followerHost, rootPasswd) + if err != nil { + log.Error(err, "failed to check role", "pod", follower) + return false, nil + } + if status != corev1.ConditionTrue { + return false, nil + } + + return true, nil + }); err != nil { + return err + } + return nil } @@ -336,7 +429,6 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err if pod.Status.Phase == corev1.PodRunning && xenonReady && mysqlReady && pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision && pod.ObjectMeta.Labels["healthy"] == "yes" { - log.Info("pod is running", "pod name", pod.Name) return true, nil } @@ -384,3 +476,24 @@ func basicEventReason(objKindName string, err error) string { return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName)) } + +func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) { + req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body) + if err != nil { + return nil, err + } + encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...)) + req.Header.Set("Authorization", "Basic "+encoded) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode) + } + + return resp.Body, nil +} diff --git a/cluster/syncer/status.go b/cluster/syncer/status.go index ca71e1ebe..9ffdd41b7 100644 --- a/cluster/syncer/status.go +++ b/cluster/syncer/status.go @@ -18,12 +18,10 @@ package syncer import ( "context" - "encoding/base64" "encoding/json" "fmt" "io" "io/ioutil" - "net/http" "time" "github.com/presslabs/controller-util/syncer" @@ -205,12 +203,6 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, log.Error(err, "failed to check read only", "node", node.Name) node.Message = err.Error() } - - if isLeader == corev1.ConditionTrue && isReadOnly != corev1.ConditionFalse { - log.V(1).Info("try to correct the leader writeable", "node", node.Name) - runner.RunQuery("SET GLOBAL read_only=off") - runner.RunQuery("SET GLOBAL super_read_only=off") - } } if runner != nil { @@ -224,7 +216,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, // update apiv1alpha1.NodeConditionReadOnly. s.updateNodeCondition(node, 2, isReadOnly) - if err = setPodHealthy(ctx, cli, &pod, node); err != nil { + if err = s.setPodHealthy(ctx, &pod, node); err != nil { log.Error(err, "cannot update pod", "name", podName, "namespace", pod.Namespace) } } @@ -284,28 +276,14 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int // checkRole used to check whether the mysql role is leader. func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) { - status := corev1.ConditionUnknown - - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/v1/raft/status", host, utils.XenonPeerPort), nil) - if err != nil { - return status, err - } - encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...)) - req.Header.Set("Authorization", "Basic "+encoded) - - client := &http.Client{} - resp, err := client.Do(req) + body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil) if err != nil { - return status, err - } - - if resp.StatusCode != 200 { - return status, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode) + return corev1.ConditionUnknown, err } var out map[string]interface{} - if err = unmarshalJSON(resp.Body, &out); err != nil { - return status, err + if err = unmarshalJSON(body, &out); err != nil { + return corev1.ConditionUnknown, err } if out["state"] == "LEADER" { @@ -316,11 +294,11 @@ func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) { return corev1.ConditionFalse, nil } - return status, nil + return corev1.ConditionUnknown, nil } // setPodHealthy set the pod lable healthy. -func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { +func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { healthy := "no" if node.Conditions[0].Status == corev1.ConditionFalse { if node.Conditions[1].Status == corev1.ConditionFalse && @@ -328,15 +306,16 @@ func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node node.Conditions[3].Status == corev1.ConditionTrue { healthy = "yes" } else if node.Conditions[1].Status == corev1.ConditionTrue && - node.Conditions[2].Status == corev1.ConditionFalse && - node.Conditions[3].Status == corev1.ConditionFalse { + node.Conditions[3].Status == corev1.ConditionFalse && + (node.Conditions[2].Status == corev1.ConditionFalse || + node.Conditions[2].Status == corev1.ConditionTrue) { healthy = "yes" } } if pod.Labels["healthy"] != healthy { pod.Labels["healthy"] = healthy - if err := cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil { + if err := s.cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil { return err } } diff --git a/internal/sql_runner.go b/internal/sql_runner.go index 035f7fd98..ac595e306 100644 --- a/internal/sql_runner.go +++ b/internal/sql_runner.go @@ -168,9 +168,42 @@ func (s *SQLRunner) RunQuery(query string, args ...interface{}) error { } // GetGlobalVariable used to get the global variable by param. -func (sr *SQLRunner) GetGlobalVariable(param string, val interface{}) error { +func (s *SQLRunner) GetGlobalVariable(param string, val interface{}) error { query := fmt.Sprintf("select @@global.%s", param) - return sr.db.QueryRow(query).Scan(val) + return s.db.QueryRow(query).Scan(val) +} + +func (s *SQLRunner) CheckProcesslist() (bool, error) { + var rows *sql.Rows + rows, err := s.db.Query("show processlist;") + if err != nil { + return false, err + } + + defer rows.Close() + + var cols []string + cols, err = rows.Columns() + if err != nil { + return false, err + } + + scanArgs := make([]interface{}, len(cols)) + for i := range scanArgs { + scanArgs[i] = &sql.RawBytes{} + } + + for rows.Next() { + if err = rows.Scan(scanArgs...); err != nil { + return false, err + } + + state := columnValue(scanArgs, cols, "State") + if strings.Contains(state, "Master has sent all binlog to slave") { + return true, nil + } + } + return false, nil } // Close closes the database and prevents new queries from starting.