diff --git a/cmd/manager/main.go b/cmd/manager/main.go index eceafe69f..3fb21ca81 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -84,6 +84,7 @@ func main() { Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("controller.mysqlcluster"), SQLRunnerFactory: internal.NewSQLRunner, + XenonExecutor: internal.NewXenonExecutor(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MysqlCluster") os.Exit(1) @@ -93,6 +94,7 @@ func main() { Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("controller.status"), SQLRunnerFactory: internal.NewSQLRunner, + XenonExecutor: internal.NewXenonExecutor(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Status") os.Exit(1) diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 31a850f19..8667d4de0 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -46,6 +46,8 @@ type MysqlClusterReconciler struct { // Mysql query runner. internal.SQLRunnerFactory + + internal.XenonExecutor } // +kubebuilder:rbac:groups=mysql.radondb.com,resources=mysqlclusters,verbs=get;list;watch;create;update;patch;delete @@ -110,6 +112,8 @@ func (r *MysqlClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request cmRev := configMapSyncer.Object().(*corev1.ConfigMap).ResourceVersion sctRev := secretSyncer.Object().(*corev1.Secret).ResourceVersion + r.XenonExecutor.SetRootPassword(instance.Spec.MysqlOpts.RootPassword) + // run the syncers for services, pdb and statefulset syncers := []syncer.Interface{ clustersyncer.NewRoleSyncer(r.Client, instance), @@ -118,7 +122,7 @@ func (r *MysqlClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request clustersyncer.NewHeadlessSVCSyncer(r.Client, instance), clustersyncer.NewLeaderSVCSyncer(r.Client, instance), clustersyncer.NewFollowerSVCSyncer(r.Client, instance), - clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory), + clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory, r.XenonExecutor), clustersyncer.NewPDBSyncer(r.Client, instance), } diff --git a/controllers/mysqluser_controller.go b/controllers/mysqluser_controller.go index ba23c28c1..672db866c 100644 --- a/controllers/mysqluser_controller.go +++ b/controllers/mysqluser_controller.go @@ -189,7 +189,7 @@ func (r *MysqlUserReconciler) reconcileUserInDB(ctx context.Context, mysqlUser * } // Remove allowed hosts for user. - toRemove := stringDiffIn(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts) + toRemove := utils.StringDiffIn(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts) for _, host := range toRemove { if err := internal.DropUser(sqlRunner, mysqlUser.Spec.User, host); err != nil { return err @@ -199,27 +199,6 @@ func (r *MysqlUserReconciler) reconcileUserInDB(ctx context.Context, mysqlUser * return nil } -func stringDiffIn(actual, desired []string) []string { - diff := []string{} - for _, aStr := range actual { - // If is not in the desired list remove it. - if _, exists := stringIn(aStr, desired); !exists { - diff = append(diff, aStr) - } - } - - return diff -} - -func stringIn(str string, strs []string) (int, bool) { - for i, s := range strs { - if s == str { - return i, true - } - } - return 0, false -} - func (r *MysqlUserReconciler) dropUserFromDB(ctx context.Context, mysqlUser *mysqluser.MysqlUser) error { sqlRunner, closeConn, err := r.SQLRunnerFactory(internal.NewConfigFromClusterKey( r.Client, mysqlUser.GetClusterKey(), utils.RootUser, utils.LeaderHost)) diff --git a/controllers/status_controller.go b/controllers/status_controller.go index 821212685..97da310c4 100644 --- a/controllers/status_controller.go +++ b/controllers/status_controller.go @@ -54,6 +54,8 @@ type StatusReconciler struct { // Mysql query runner. internal.SQLRunnerFactory + + internal.XenonExecutor } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -92,7 +94,9 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } }() - statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory) + r.XenonExecutor.SetRootPassword(instance.Spec.MysqlOpts.RootPassword) + + statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory, r.XenonExecutor) if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil { return ctrl.Result{}, err } diff --git a/mysqlcluster/syncer/statefulset.go b/mysqlcluster/syncer/statefulset.go index aec95269a..a1eb57c3d 100644 --- a/mysqlcluster/syncer/statefulset.go +++ b/mysqlcluster/syncer/statefulset.go @@ -18,11 +18,8 @@ package syncer import ( "context" - "encoding/base64" "errors" "fmt" - "io" - "net/http" "time" "github.com/iancoleman/strcase" @@ -41,7 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" - "github.com/radondb/radondb-mysql-kubernetes/internal" + . "github.com/radondb/radondb-mysql-kubernetes/internal" "github.com/radondb/radondb-mysql-kubernetes/mysqlcluster" "github.com/radondb/radondb-mysql-kubernetes/mysqlcluster/container" "github.com/radondb/radondb-mysql-kubernetes/utils" @@ -65,11 +62,13 @@ type StatefulSetSyncer struct { sctRev string // Mysql query runner. - internal.SQLRunnerFactory + SQLRunnerFactory + + XenonExecutor } // NewStatefulSetSyncer returns a pointer to StatefulSetSyncer. -func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory) *StatefulSetSyncer { +func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev, sctRev string, sqlRunnerFactory SQLRunnerFactory, xenonExecutor XenonExecutor) *StatefulSetSyncer { return &StatefulSetSyncer{ MysqlCluster: c, cli: cli, @@ -86,6 +85,7 @@ func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev cmRev: cmRev, sctRev: sctRev, SQLRunnerFactory: sqlRunnerFactory, + XenonExecutor: xenonExecutor, } } @@ -360,7 +360,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { // 5. Check followerHost current role. // 6. If followerHost is not leader, switch it to leader through xenon. func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { - leaderRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey( + leaderRunner, closeConn, err := s.SQLRunnerFactory(NewConfigFromClusterKey( s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, utils.LeaderHost)) if err != nil { return err @@ -402,13 +402,13 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) { // Set leader read only. - if err = leaderRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=on;")); err != nil { + if err = leaderRunner.QueryExec(NewQuery("SET GLOBAL super_read_only=on;")); err != nil { log.Error(err, "failed to set leader read only", "node", leader) return false, err } // Make sure the master has sent all binlog to slave. - success, err := internal.CheckProcesslist(leaderRunner) + success, err := CheckProcesslist(leaderRunner) if err != nil { return false, err } @@ -422,18 +422,14 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace) if err = retry(time.Second*5, time.Second*60, func() (bool, error) { - // Check whether is leader. - status, err := checkRole(followerHost, rootPasswd) - if err != nil { - log.Error(err, "failed to check role", "pod", follower) - return false, nil - } - if status == corev1.ConditionTrue { - return true, nil + for _, node := range s.Status.Nodes { + if node.Name == followerHost && node.RaftStatus.Role == string(utils.Leader) { + return true, nil + } } // If not leader, try to leader. - xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil) + s.XenonExecutor.RaftTryToLeader(followerHost) return false, nil }); err != nil { return err @@ -656,27 +652,6 @@ func basicEventReason(objKindName string, err error) string { return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName)) } -func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) { - req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body) - if err != nil { - return nil, err - } - encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...)) - req.Header.Set("Authorization", "Basic "+encoded) - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return nil, err - } - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode) - } - - return resp.Body, nil -} - // check the backup is exist and running func (s *StatefulSetSyncer) backupIsRunning(ctx context.Context) (bool, error) { backuplist := apiv1alpha1.BackupList{} diff --git a/mysqlcluster/syncer/status.go b/mysqlcluster/syncer/status.go index 589197802..29e0da392 100644 --- a/mysqlcluster/syncer/status.go +++ b/mysqlcluster/syncer/status.go @@ -18,21 +18,17 @@ package syncer import ( "context" - "encoding/json" "fmt" - "io" - "io/ioutil" "time" "github.com/presslabs/controller-util/syncer" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" - "github.com/radondb/radondb-mysql-kubernetes/internal" + . "github.com/radondb/radondb-mysql-kubernetes/internal" "github.com/radondb/radondb-mysql-kubernetes/mysqlcluster" "github.com/radondb/radondb-mysql-kubernetes/utils" ) @@ -50,15 +46,18 @@ type StatusSyncer struct { cli client.Client // Mysql query runner. - internal.SQLRunnerFactory + SQLRunnerFactory + + XenonExecutor } // NewStatusSyncer returns a pointer to StatusSyncer. -func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory) *StatusSyncer { +func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client, sqlRunnerFactory SQLRunnerFactory, xenonExecutor XenonExecutor) *StatusSyncer { return &StatusSyncer{ MysqlCluster: c, cli: cli, SQLRunnerFactory: sqlRunnerFactory, + XenonExecutor: xenonExecutor, } } @@ -177,68 +176,41 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition { // updateNodeStatus update the node status. func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error { - sctName := s.GetNameForResource(utils.Secret) - svcName := s.GetNameForResource(utils.HeadlessSVC) - nameSpace := s.Namespace - - secret := &corev1.Secret{} - if err := cli.Get(context.TODO(), - types.NamespacedName{ - Namespace: nameSpace, - Name: sctName, - }, - secret, - ); err != nil { - log.V(1).Info("secret not found", "name", sctName) - return nil - } - - rootPasswd, ok := secret.Data["root-password"] - if !ok { - return fmt.Errorf("failed to get the root password: %s", rootPasswd) - } - for _, pod := range pods { podName := pod.Name - host := fmt.Sprintf("%s.%s.%s", podName, svcName, nameSpace) + host := fmt.Sprintf("%s.%s.%s", podName, s.GetNameForResource(utils.HeadlessSVC), s.Namespace) index := s.getNodeStatusIndex(host) node := &s.Status.Nodes[index] node.Message = "" - isLeader, err := checkRole(host, rootPasswd) - if err != nil { - log.Error(err, "failed to check the node role", "node", node.Name) - node.Message = err.Error() - } - // update apiv1alpha1.NodeConditionLeader. - s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader) + s.updateNodeRaftStatus(node) isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown - sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey( + sqlRunner, closeConn, err := s.SQLRunnerFactory(NewConfigFromClusterKey( s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, host)) defer closeConn() if err != nil { log.Error(err, "failed to connect the mysql", "node", node.Name) node.Message = err.Error() } else { - isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry) + isLagged, isReplicating, err = CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry) if err != nil { log.Error(err, "failed to check slave status", "node", node.Name) node.Message = err.Error() } - isReadOnly, err = internal.CheckReadOnly(sqlRunner) + isReadOnly, err = CheckReadOnly(sqlRunner) if err != nil { log.Error(err, "failed to check read only", "node", node.Name) node.Message = err.Error() } if !utils.ExistUpdateFile() && - isLeader == corev1.ConditionTrue && + node.RaftStatus.Role == string(utils.Leader) && isReadOnly != corev1.ConditionFalse { log.V(1).Info("try to correct the leader writeable", "node", node.Name) - sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL read_only=off")) - sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=off")) + sqlRunner.QueryExec(NewQuery("SET GLOBAL read_only=off")) + sqlRunner.QueryExec(NewQuery("SET GLOBAL super_read_only=off")) } } @@ -307,27 +279,21 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int } } -// checkRole used to check whether the mysql role is leader. -func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) { - body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil) +// updatenodeRaftstatus Update Node RaftStatus. +func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error { + raftStatus, err := s.XenonExecutor.RaftStatus(node.Name) if err != nil { - return corev1.ConditionUnknown, err - } - - var out map[string]interface{} - if err = unmarshalJSON(body, &out); err != nil { - return corev1.ConditionUnknown, err - } - - if out["state"] == "LEADER" { - return corev1.ConditionTrue, nil + return err } + node.RaftStatus = *raftStatus - if out["state"] == "FOLLOWER" { - return corev1.ConditionFalse, nil + isLeader := corev1.ConditionFalse + if node.RaftStatus.Role == string(utils.Leader) { + isLeader = corev1.ConditionTrue } - - return corev1.ConditionUnknown, nil + // update apiv1alpha1.NodeConditionLeader. + s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader) + return nil } // setPodHealthy set the pod lable healthy. @@ -353,17 +319,3 @@ func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node } return nil } - -func unmarshalJSON(in io.Reader, obj interface{}) error { - body, err := ioutil.ReadAll(in) - if err != nil { - return fmt.Errorf("io read error: %s", err) - } - - if err = json.Unmarshal(body, obj); err != nil { - log.V(1).Info("error unmarshal data", "body", string(body)) - return err - } - - return nil -} diff --git a/utils/common.go b/utils/common.go index 1ad2306a2..1d41e9be7 100644 --- a/utils/common.go +++ b/utils/common.go @@ -17,7 +17,10 @@ limitations under the License. package utils import ( + "encoding/json" "fmt" + "io" + "io/ioutil" "os" "sort" "strconv" @@ -120,3 +123,16 @@ func stringIn(str string, strs []string) (int, bool) { return 0, false } + +func UnmarshalJSON(in io.Reader, obj interface{}) error { + body, err := ioutil.ReadAll(in) + if err != nil { + return fmt.Errorf("io read error: %s", err) + } + + if err = json.Unmarshal(body, obj); err != nil { + return fmt.Errorf("error unmarshal data, error: %s, body: %s", err, string(body)) + } + + return nil +} diff --git a/utils/constants.go b/utils/constants.go index edc3a7af3..f31a8ca58 100644 --- a/utils/constants.go +++ b/utils/constants.go @@ -16,6 +16,8 @@ limitations under the License. package utils +import "net/http" + var ( // MySQLDefaultVersion is the version for mysql that should be used MySQLDefaultVersion = "5.7.34" @@ -30,6 +32,11 @@ var ( "5.7.33": "percona/percona-server:5.7.33", "5.7.34": "percona/percona-server:5.7.34", } + // Xenonhttpurls saves the xenon http url and its corresponding request type. + XenonHttpUrls = map[string]string{ + "/v1/raft/status": http.MethodGet, + "/v1/raft/trytoleader": http.MethodPost, + } ) const ( @@ -134,3 +141,20 @@ const ( // JobType const BackupJobTypeName = ContainerBackupName + +// Raftrole is the role of the node in raft. +type RaftRole string + +const ( + Leader RaftRole = "LEADER" + Follower RaftRole = "FOLLOWER" + Candidate RaftRole = "CANDIDATE" +) + +// Xenonhttpurl is a http url corresponding to the xenon instruction. +type XenonHttpUrl string + +const ( + RaftStatus XenonHttpUrl = "/v1/raft/status" + RaftTryToLeader XenonHttpUrl = "/v1/raft/trytoleader" +)