diff --git a/api/v1alpha1/cluster_types.go b/api/v1alpha1/cluster_types.go index 64baf79ac..63abbe433 100644 --- a/api/v1alpha1/cluster_types.go +++ b/api/v1alpha1/cluster_types.go @@ -266,6 +266,16 @@ type NodeCondition struct { LastTransitionTime metav1.Time `json:"lastTransitionTime"` } +// The index of the NodeStatus.Conditions. +type NodeConditionsIndex uint8 + +const ( + IndexLagged NodeConditionsIndex = iota + IndexLeader + IndexReadOnly + IndexReplicating +) + // NodeConditionType defines type for node condition type. type NodeConditionType string diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index 9896e599d..0eacbc2a1 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -18,8 +18,11 @@ package syncer import ( "context" + "encoding/base64" "errors" "fmt" + "io" + "net/http" "time" "github.com/iancoleman/strcase" @@ -39,6 +42,7 @@ import ( "github.com/radondb/radondb-mysql-kubernetes/cluster" "github.com/radondb/radondb-mysql-kubernetes/cluster/container" + "github.com/radondb/radondb-mysql-kubernetes/internal" "github.com/radondb/radondb-mysql-kubernetes/utils" ) @@ -93,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } // Sync persists data into the external store. +// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { var err error var kind string @@ -159,7 +164,7 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. return controllerutil.OperationResultNone, err } - if err := s.updatePod(ctx, s.sfs); err != nil { + if err := s.updatePod(ctx); err != nil { return controllerutil.OperationResultNone, err } @@ -167,14 +172,14 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. } // updatePod update the pods. -func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error { - if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas { +func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { + if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { return nil } log.Info("statefulSet was changed, run update") - if sfs.Status.ReadyReplicas < sfs.Status.Replicas { + if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas { log.Info("can't start/continue 'update': waiting for all replicas are ready") return nil } @@ -183,15 +188,15 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS if err := s.cli.List(ctx, &pods, &client.ListOptions{ - Namespace: sfs.Namespace, + Namespace: s.sfs.Namespace, LabelSelector: s.GetLabels().AsSelector(), }, ); err != nil { return err } - // get the leader pod. var leaderPod corev1.Pod + var followerPods []corev1.Pod for _, pod := range pods.Items { if pod.ObjectMeta.Labels["healthy"] != "yes" { return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) @@ -199,23 +204,111 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { leaderPod = pod - log.Info("get leader pod", "pod name", leaderPod.Name) continue } + followerPods = append(followerPods, pod) if err := s.applyNWait(ctx, &pod); err != nil { return err } } if leaderPod.Name != "" { - log.Info("apply changes to leader pod", "pod name", leaderPod.Name) + if len(followerPods) == 1 { + if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { + return err + } + } if err := s.applyNWait(ctx, &leaderPod); err != nil { return err } } - log.Info("update finished") + return nil +} + +// preUpdate run before update the leader pod when replica is 2. +func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { + if s.sfs.Status.Replicas != 2 { + return nil + } + + sctName := s.GetNameForResource(utils.Secret) + svcName := s.GetNameForResource(utils.HeadlessSVC) + port := utils.MysqlPort + nameSpace := s.Namespace + + // get secrets. + secret := &corev1.Secret{} + if err := s.cli.Get(context.TODO(), + types.NamespacedName{ + Namespace: nameSpace, + Name: sctName, + }, + secret, + ); err != nil { + return fmt.Errorf("failed to get the secret: %s", sctName) + } + user, ok := secret.Data["operator-user"] + if !ok { + return fmt.Errorf("failed to get the user: %s", user) + } + password, ok := secret.Data["operator-password"] + if !ok { + return fmt.Errorf("failed to get the password: %s", password) + } + rootPasswd, ok := secret.Data["root-password"] + if !ok { + return fmt.Errorf("failed to get the root password: %s", rootPasswd) + } + + leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace) + leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port) + if err != nil { + log.Error(err, "failed to connect the mysql", "node", leader) + return err + } + defer leaderRunner.Close() + + if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) { + // set leader read only. + if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil { + log.Error(err, "failed to set leader read only", "node", leader) + return false, err + } + + // make sure the master has sent all binlog to slave. + success, err := leaderRunner.CheckProcesslist() + if err != nil { + return false, err + } + if success { + return true, nil + } + return false, nil + }); err != nil { + return err + } + + followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace) + if err = retry(time.Second*5, time.Second*60, func() (bool, error) { + // check whether is leader. + status, err := checkRole(followerHost, rootPasswd) + if err != nil { + log.Error(err, "failed to check role", "pod", follower) + return false, nil + } + if status == corev1.ConditionTrue { + return true, nil + } + + // If not leader, try to leader. + xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil) + return false, nil + }); err != nil { + return err + } + return nil } @@ -312,31 +405,19 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err } } - // wait the pod restart. + // wait the pod restart and healthy. return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) { err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) if err != nil && !k8serrors.IsNotFound(err) { return false, err } - var xenonReady, mysqlReady bool - for _, container := range pod.Status.ContainerStatuses { - if container.Name == "xenon" { - xenonReady = container.Ready - } - if container.Name == "mysql" { - mysqlReady = container.Ready - } - } - if pod.Status.Phase == corev1.PodFailed { return false, fmt.Errorf("pod %s is in failed phase", pod.Name) } - if pod.Status.Phase == corev1.PodRunning && xenonReady && mysqlReady && - pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision && - pod.ObjectMeta.Labels["healthy"] == "yes" { - log.Info("pod is running", "pod name", pod.Name) + if pod.ObjectMeta.Labels["healthy"] == "yes" && + pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { return true, nil } @@ -384,3 +465,24 @@ func basicEventReason(objKindName string, err error) string { return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName)) } + +func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) { + req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body) + if err != nil { + return nil, err + } + encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...)) + req.Header.Set("Authorization", "Basic "+encoded) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode) + } + + return resp.Body, nil +} diff --git a/cluster/syncer/status.go b/cluster/syncer/status.go index ca71e1ebe..02df39045 100644 --- a/cluster/syncer/status.go +++ b/cluster/syncer/status.go @@ -18,12 +18,10 @@ package syncer import ( "context" - "encoding/base64" "encoding/json" "fmt" "io" "io/ioutil" - "net/http" "time" "github.com/presslabs/controller-util/syncer" @@ -186,7 +184,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, node.Message = err.Error() } // update apiv1alpha1.NodeConditionLeader. - s.updateNodeCondition(node, 1, isLeader) + s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader) isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port) @@ -218,13 +216,13 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, } // update apiv1alpha1.NodeConditionLagged. - s.updateNodeCondition(node, 0, isLagged) + s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged) // update apiv1alpha1.NodeConditionReplicating. - s.updateNodeCondition(node, 3, isReplicating) + s.updateNodeCondition(node, int(apiv1alpha1.IndexReplicating), isReplicating) // update apiv1alpha1.NodeConditionReadOnly. - s.updateNodeCondition(node, 2, isReadOnly) + s.updateNodeCondition(node, int(apiv1alpha1.IndexReadOnly), isReadOnly) - if err = setPodHealthy(ctx, cli, &pod, node); err != nil { + if err = s.setPodHealthy(ctx, &pod, node); err != nil { log.Error(err, "cannot update pod", "name", podName, "namespace", pod.Namespace) } } @@ -284,28 +282,14 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int // checkRole used to check whether the mysql role is leader. func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) { - status := corev1.ConditionUnknown - - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/v1/raft/status", host, utils.XenonPeerPort), nil) - if err != nil { - return status, err - } - encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...)) - req.Header.Set("Authorization", "Basic "+encoded) - - client := &http.Client{} - resp, err := client.Do(req) + body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil) if err != nil { - return status, err - } - - if resp.StatusCode != 200 { - return status, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode) + return corev1.ConditionUnknown, err } var out map[string]interface{} - if err = unmarshalJSON(resp.Body, &out); err != nil { - return status, err + if err = unmarshalJSON(body, &out); err != nil { + return corev1.ConditionUnknown, err } if out["state"] == "LEADER" { @@ -316,27 +300,27 @@ func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) { return corev1.ConditionFalse, nil } - return status, nil + return corev1.ConditionUnknown, nil } // setPodHealthy set the pod lable healthy. -func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { +func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { healthy := "no" - if node.Conditions[0].Status == corev1.ConditionFalse { - if node.Conditions[1].Status == corev1.ConditionFalse && - node.Conditions[2].Status == corev1.ConditionTrue && - node.Conditions[3].Status == corev1.ConditionTrue { + if node.Conditions[apiv1alpha1.IndexLagged].Status == corev1.ConditionFalse { + if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionFalse && + node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionTrue && + node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionTrue { healthy = "yes" - } else if node.Conditions[1].Status == corev1.ConditionTrue && - node.Conditions[2].Status == corev1.ConditionFalse && - node.Conditions[3].Status == corev1.ConditionFalse { + } else if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionTrue && + node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionFalse && + node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionFalse { healthy = "yes" } } if pod.Labels["healthy"] != healthy { pod.Labels["healthy"] = healthy - if err := cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil { + if err := s.cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil { return err } } diff --git a/internal/sql_runner.go b/internal/sql_runner.go index 035f7fd98..ac595e306 100644 --- a/internal/sql_runner.go +++ b/internal/sql_runner.go @@ -168,9 +168,42 @@ func (s *SQLRunner) RunQuery(query string, args ...interface{}) error { } // GetGlobalVariable used to get the global variable by param. -func (sr *SQLRunner) GetGlobalVariable(param string, val interface{}) error { +func (s *SQLRunner) GetGlobalVariable(param string, val interface{}) error { query := fmt.Sprintf("select @@global.%s", param) - return sr.db.QueryRow(query).Scan(val) + return s.db.QueryRow(query).Scan(val) +} + +func (s *SQLRunner) CheckProcesslist() (bool, error) { + var rows *sql.Rows + rows, err := s.db.Query("show processlist;") + if err != nil { + return false, err + } + + defer rows.Close() + + var cols []string + cols, err = rows.Columns() + if err != nil { + return false, err + } + + scanArgs := make([]interface{}, len(cols)) + for i := range scanArgs { + scanArgs[i] = &sql.RawBytes{} + } + + for rows.Next() { + if err = rows.Scan(scanArgs...); err != nil { + return false, err + } + + state := columnValue(scanArgs, cols, "State") + if strings.Contains(state, "Master has sent all binlog to slave") { + return true, nil + } + } + return false, nil } // Close closes the database and prevents new queries from starting.