From 53edf42156f862aad05b0d61658fbfd30aa17fa1 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Thu, 4 Nov 2021 17:20:32 +0800 Subject: [PATCH 1/3] api: Add cluster status scaleIn and scaleOut. --- api/v1alpha1/mysqlcluster_types.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/mysqlcluster_types.go b/api/v1alpha1/mysqlcluster_types.go index f811a184..5565946c 100644 --- a/api/v1alpha1/mysqlcluster_types.go +++ b/api/v1alpha1/mysqlcluster_types.go @@ -247,7 +247,7 @@ type Persistence struct { type ClusterState string const ( - // ClusterInitState indicates whether the cluster is initializing. + // ClusterInitState indicates whether the cluster is initializing. ClusterInitState ClusterState = "Initializing" // ClusterUpdateState indicates whether the cluster is being updated. ClusterUpdateState ClusterState = "Updating" @@ -255,6 +255,10 @@ const ( ClusterReadyState ClusterState = "Ready" // ClusterCloseState indicates whether the cluster is closed. ClusterCloseState ClusterState = "Closed" + // ClusterScaleInState indicates whether the cluster replicas is decreasing. + ClusterScaleInState ClusterState = "ScaleIn" + // ClusterScaleOutState indicates whether the cluster replicas is increasing. + ClusterScaleOutState ClusterState = "ScaleOut" ) // ClusterConditionType defines type for cluster condition type. @@ -271,6 +275,10 @@ const ( ConditionClose ClusterConditionType = "Closed" // ConditionError indicates whether there is an error in the cluster. ConditionError ClusterConditionType = "Error" + // ConditionScaleIn indicates whether the cluster replicas is decreasing. + ConditionScaleIn ClusterConditionType = "ScaleIn" + // ConditionScaleOut indicates whether the cluster replicas is increasing. + ConditionScaleOut ClusterConditionType = "ScaleOut" ) // ClusterCondition defines type for cluster conditions. From c03590e225d5855a8712cce2282a3e4cb5e604b1 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Thu, 4 Nov 2021 17:23:05 +0800 Subject: [PATCH 2/3] *: Remove the reference to the Replicas in the containers. --- mysqlcluster/container/backup.go | 6 - mysqlcluster/container/init_sidecar.go | 5 - mysqlcluster/container/init_sidecar_test.go | 5 - mysqlcluster/container/xenon.go | 13 +- mysqlcluster/container/xenon_test.go | 16 +-- sidecar/config.go | 131 -------------------- sidecar/init.go | 17 --- 7 files changed, 3 insertions(+), 190 deletions(-) diff --git a/mysqlcluster/container/backup.go b/mysqlcluster/container/backup.go index 66688789..a4e96240 100644 --- a/mysqlcluster/container/backup.go +++ b/mysqlcluster/container/backup.go @@ -17,8 +17,6 @@ limitations under the License. package container import ( - "fmt" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -59,10 +57,6 @@ func (c *backupSidecar) getEnvVars() []corev1.EnvVar { Name: "SERVICE_NAME", Value: c.GetNameForResource(utils.HeadlessSVC), }, - { - Name: "REPLICAS", - Value: fmt.Sprintf("%d", *c.Spec.Replicas), - }, { Name: "MYSQL_ROOT_PASSWORD", Value: c.Spec.MysqlOpts.RootPassword, diff --git a/mysqlcluster/container/init_sidecar.go b/mysqlcluster/container/init_sidecar.go index 36400cc0..4ad6b334 100644 --- a/mysqlcluster/container/init_sidecar.go +++ b/mysqlcluster/container/init_sidecar.go @@ -17,7 +17,6 @@ limitations under the License. package container import ( - "fmt" "strconv" corev1 "k8s.io/api/core/v1" @@ -79,10 +78,6 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar { Name: "STATEFULSET_NAME", Value: c.GetNameForResource(utils.StatefulSet), }, - { - Name: "REPLICAS", - Value: fmt.Sprintf("%d", *c.Spec.Replicas), - }, { Name: "ADMIT_DEFEAT_HEARBEAT_COUNT", Value: strconv.Itoa(int(*c.Spec.XenonOpts.AdmitDefeatHearbeatCount)), diff --git a/mysqlcluster/container/init_sidecar_test.go b/mysqlcluster/container/init_sidecar_test.go index 59406481..4a44ebeb 100644 --- a/mysqlcluster/container/init_sidecar_test.go +++ b/mysqlcluster/container/init_sidecar_test.go @@ -17,7 +17,6 @@ limitations under the License. package container import ( - "fmt" "strconv" "testing" @@ -92,10 +91,6 @@ var ( Name: "STATEFULSET_NAME", Value: "sample-mysql", }, - { - Name: "REPLICAS", - Value: fmt.Sprintf("%d", *testInitSidecarCluster.Spec.Replicas), - }, { Name: "ADMIT_DEFEAT_HEARBEAT_COUNT", Value: strconv.Itoa(int(*testInitSidecarCluster.Spec.XenonOpts.AdmitDefeatHearbeatCount)), diff --git a/mysqlcluster/container/xenon.go b/mysqlcluster/container/xenon.go index 86b806ec..cdb9c68d 100644 --- a/mysqlcluster/container/xenon.go +++ b/mysqlcluster/container/xenon.go @@ -53,18 +53,7 @@ func (c *xenon) getEnvVars() []corev1.EnvVar { // getLifecycle get the container lifecycle. func (c *xenon) getLifecycle() *corev1.Lifecycle { - return &corev1.Lifecycle{ - PostStart: &corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"sh", "-c", "/scripts/post-start.sh"}, - }, - }, - PreStop: &corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"sh", "-c", "/scripts/pre-stop.sh"}, - }, - }, - } + return nil } // getResources get the container resources. diff --git a/mysqlcluster/container/xenon_test.go b/mysqlcluster/container/xenon_test.go index 628e160b..f6037852 100644 --- a/mysqlcluster/container/xenon_test.go +++ b/mysqlcluster/container/xenon_test.go @@ -28,7 +28,7 @@ import ( ) var ( - xenonReplicas int32 = 1 + xenonReplicas int32 = 3 xenonMysqlCluster = mysqlv1alpha1.MysqlCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "sample", @@ -70,19 +70,7 @@ func TestGetXenonEnvVar(t *testing.T) { } func TestGetXenonLifecycle(t *testing.T) { - lifecycle := &corev1.Lifecycle{ - PostStart: &corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"sh", "-c", "/scripts/post-start.sh"}, - }, - }, - PreStop: &corev1.Handler{ - Exec: &corev1.ExecAction{ - Command: []string{"sh", "-c", "/scripts/pre-stop.sh"}, - }, - }, - } - assert.Equal(t, lifecycle, xenonCase.Lifecycle) + assert.Nil(t, xenonCase.Lifecycle) } func TestGetXenonResources(t *testing.T) { diff --git a/sidecar/config.go b/sidecar/config.go index cf9b701c..a3a24b47 100644 --- a/sidecar/config.go +++ b/sidecar/config.go @@ -38,8 +38,6 @@ type Config struct { ServiceName string // The name of the statefulset. StatefulSetName string - // Replicas is the number of pods. - Replicas int32 // The password of the root user. RootPassword string @@ -134,13 +132,6 @@ func NewInitConfig() *Config { } } - replicaStr := getEnvValue("REPLICAS") - replicas, err := strconv.ParseInt(replicaStr, 10, 32) - if err != nil { - log.Error(err, "invalid environment values", "REPLICAS", replicaStr) - panic(err) - } - initTokuDB := false if len(getEnvValue("INIT_TOKUDB")) > 0 { initTokuDB = true @@ -162,7 +153,6 @@ func NewInitConfig() *Config { NameSpace: getEnvValue("NAMESPACE"), ServiceName: getEnvValue("SERVICE_NAME"), StatefulSetName: getEnvValue("STATEFULSET_NAME"), - Replicas: int32(replicas), RootPassword: getEnvValue("MYSQL_ROOT_PASSWORD"), InternalRootPassword: getEnvValue("INTERNAL_ROOT_PASSWORD"), @@ -198,17 +188,9 @@ func NewInitConfig() *Config { // NewBackupConfig returns the configuration file needed for backup container. func NewBackupConfig() *Config { - replicaStr := getEnvValue("REPLICAS") - replicas, err := strconv.ParseInt(replicaStr, 10, 32) - if err != nil { - log.Error(err, "invalid environment values", "REPLICAS", replicaStr) - panic(err) - } - return &Config{ NameSpace: getEnvValue("NAMESPACE"), ServiceName: getEnvValue("SERVICE_NAME"), - Replicas: int32(replicas), ClusterName: getEnvValue("SERVICE_NAME"), RootPassword: getEnvValue("MYSQL_ROOT_PASSWORD"), @@ -224,17 +206,9 @@ func NewBackupConfig() *Config { // NewReqBackupConfig returns the configuration file needed for backup job. func NewReqBackupConfig() *Config { - replicaStr := getEnvValue("REPLICAS") - replicas, err := strconv.ParseInt(replicaStr, 10, 32) - if err != nil { - log.Error(err, "invalid environment values", "REPLICAS", replicaStr) - panic(err) - } - return &Config{ NameSpace: getEnvValue("NAMESPACE"), ServiceName: getEnvValue("SERVICE_NAME"), - Replicas: int32(replicas), BackupUser: getEnvValue("BACKUP_USER"), BackupPassword: getEnvValue("BACKUP_PASSWORD"), @@ -418,111 +392,6 @@ func (cfg *Config) buildClientConfig() (*ini.File, error) { return conf, nil } -func (cfg *Config) buildPostStart() ([]byte, error) { - ordinal, err := utils.GetOrdinal(cfg.HostName) - if err != nil { - return nil, err - } - - nums := ordinal - if cfg.existMySQLData { - nums = int(cfg.Replicas) - } - - host := fmt.Sprintf("%s.%s.%s", cfg.HostName, cfg.ServiceName, cfg.NameSpace) - - str := fmt.Sprintf(`#!/bin/sh -while true; do - info=$(curl -i -X GET -u root:%s http://%s:%d/v1/xenon/ping) - code=$(echo $info|grep "HTTP"|awk '{print $2}') - if [ "$code" -eq "200" ]; then - break - fi -done -`, cfg.RootPassword, host, utils.XenonPeerPort) - - if !cfg.existMySQLData && ordinal == 0 { - str = fmt.Sprintf(`%s -for i in $(seq 12); do - curl -i -X POST -u root:%s http://%s:%d/v1/raft/trytoleader - sleep 5 - curl -i -X GET -u root:%s http://%s:%d/v1/raft/status | grep LEADER - if [ $? -eq 0 ] ; then - echo "trytoleader success" - break - fi - if [ $i -eq 12 ]; then - echo "wait trytoleader failed" - fi -done -`, str, cfg.RootPassword, host, utils.XenonPeerPort, cfg.RootPassword, host, utils.XenonPeerPort) - } else { - str = fmt.Sprintf(`%s -i=0 -while [ $i -lt %d ]; do - if [ $i -ne %d ]; then - for k in $(seq 12); do - res=$(curl -i -X POST -d '{"address": "%s-'$i'.%s.%s:%d"}' -u root:%s http://%s:%d/v1/cluster/add) - code=$(echo $res|grep "HTTP"|awk '{print $2}') - if [ "$code" -eq "200" ]; then - break - fi - done - - for k in $(seq 12); do - res=$(curl -i -X POST -d '{"address": "%s:%d"}' -u root:%s http://%s-$i.%s.%s:%d/v1/cluster/add) - code=$(echo $res|grep "HTTP"|awk '{print $2}') - if [ "$code" -eq "200" ]; then - break - fi - done - fi - i=$((i+1)) -done -`, str, nums, ordinal, cfg.StatefulSetName, cfg.ServiceName, cfg.NameSpace, utils.XenonPort, - cfg.RootPassword, host, utils.XenonPeerPort, host, utils.XenonPort, cfg.RootPassword, - cfg.StatefulSetName, cfg.ServiceName, cfg.NameSpace, utils.XenonPeerPort) - } - - return utils.StringToBytes(str), nil -} - -func (cfg *Config) buildPreStop() []byte { - host := fmt.Sprintf("%s.%s.%s", cfg.HostName, cfg.ServiceName, cfg.NameSpace) - - str := fmt.Sprintf(`#!/bin/sh -while true; do - info=$(curl -i -X GET -u root:%s http://%s:%d/v1/xenon/ping) - code=$(echo $info|grep "HTTP"|awk '{print $2}') - if [ "$code" -eq "200" ]; then - break - fi -done - -curl -i -X PUT -u root:%s http://%s:%d/v1/raft/disable -for line in $(curl -X GET -u root:%s http://%s:%d/v1/raft/status | jq -r .nodes[] | cut -d : -f 1) -do - if [ "$line" != "%s" ]; then - for i in $(seq 12); do - info=$(curl -i -X POST -d '{"address": "%s:%d"}' -u root:%s http://$line:%d/v1/cluster/remove) - code=$(echo $info|grep "HTTP"|awk '{print $2}') - if [ "$code" -eq "200" ]; then - break - fi - if [ $i -eq 12 ]; then - echo "remove node failed" - break - fi - sleep 5 - done - fi -done -`, cfg.RootPassword, host, utils.XenonPeerPort, cfg.RootPassword, host, utils.XenonPeerPort, cfg.RootPassword, - host, utils.XenonPeerPort, host, host, utils.XenonPort, cfg.RootPassword, utils.XenonPeerPort) - - return utils.StringToBytes(str) -} - // buildLeaderStart build the leader-start.sh. func (cfg *Config) buildLeaderStart() []byte { str := fmt.Sprintf(`#!/usr/bin/env bash diff --git a/sidecar/init.go b/sidecar/init.go index 6bc0d358..d2ef8dae 100644 --- a/sidecar/init.go +++ b/sidecar/init.go @@ -115,23 +115,6 @@ func runInitCommand(cfg *Config) error { return fmt.Errorf("failed to save extra.cnf: %s", err) } - // build post-start.sh. - bashPostStartPath := path.Join(scriptsPath, "post-start.sh") - bashPostStart, err := cfg.buildPostStart() - if err != nil { - return fmt.Errorf("failed to build post-start.sh: %s", err) - } - if err = ioutil.WriteFile(bashPostStartPath, bashPostStart, os.FileMode(0755)); err != nil { - return fmt.Errorf("failed to write post-start.sh: %s", err) - } - - // build pre-stop.sh. - bashPreStopPath := path.Join(scriptsPath, "pre-stop.sh") - bashPreStop := cfg.buildPreStop() - if err = ioutil.WriteFile(bashPreStopPath, bashPreStop, os.FileMode(0755)); err != nil { - return fmt.Errorf("failed to write pre-stop.sh: %s", err) - } - // build leader-start.sh. bashLeaderStart := cfg.buildLeaderStart() leaderStartPath := path.Join(scriptsPath, "leader-start.sh") From f161f8a110c06b5931d2007d1e689cd17b142ee5 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Thu, 18 Nov 2021 10:27:42 +0800 Subject: [PATCH 3/3] *: Support to manage the increase/deletion of the raft node via Operator. #221 --- internal/xenon_executor.go | 41 ++++++++++++++++++++++ mysqlcluster/syncer/status.go | 66 ++++++++++++++++++++++++++++++++--- utils/common.go | 3 -- utils/constants.go | 6 ++++ 4 files changed, 108 insertions(+), 8 deletions(-) diff --git a/internal/xenon_executor.go b/internal/xenon_executor.go index 3717a3a8..d5276827 100644 --- a/internal/xenon_executor.go +++ b/internal/xenon_executor.go @@ -33,7 +33,10 @@ type XenonExecutor interface { GetRootPassword() string SetRootPassword(rootPassword string) RaftStatus(host string) (*apiv1alpha1.RaftStatus, error) + XenonPing(host string) error RaftTryToLeader(host string) error + ClusterAdd(host string, toAdd string) error + ClusterRemove(host string, toRemove string) error } func NewXenonExecutor() XenonExecutor { @@ -86,3 +89,41 @@ func (executor *xenonExecutor) RaftTryToLeader(host string) error { } return nil } + +func (executor *xenonExecutor) XenonPing(host string) error { + req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.XenonPing, nil)) + if err != nil { + return err + } + _, err = executor.httpExecutor.Execute(req) + if err != nil { + return fmt.Errorf("failed to ping host[%s], err: %s", req.Req.URL, err) + } + return nil +} + +func (executor *xenonExecutor) ClusterAdd(host string, toAdd string) error { + addHost := fmt.Sprintf("{\"address\": \"%s\"}", toAdd) + req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterAdd, addHost)) + if err != nil { + return err + } + _, err = executor.httpExecutor.Execute(req) + if err != nil { + return fmt.Errorf("failed to add host[%s] to host[%s], err: %s", addHost, req.Req.URL, err) + } + return nil +} + +func (executor *xenonExecutor) ClusterRemove(host string, toRemove string) error { + removeHost := fmt.Sprintf("{\"address\": \"%s\"}", toRemove) + req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterRemove, removeHost)) + if err != nil { + return err + } + _, err = executor.httpExecutor.Execute(req) + if err != nil { + return fmt.Errorf("failed to remove host[%s] from host[%s], err: %s", removeHost, req.Req.URL, err) + } + return nil +} diff --git a/mysqlcluster/syncer/status.go b/mysqlcluster/syncer/status.go index 5c162c7e..13d93908 100644 --- a/mysqlcluster/syncer/status.go +++ b/mysqlcluster/syncer/status.go @@ -119,8 +119,13 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { s.Status.ReadyNodes = len(readyNodes) if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 { - s.Status.State = apiv1alpha1.ClusterReadyState - clusterCondition.Type = apiv1alpha1.ConditionReady + if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil { + clusterCondition.Message = fmt.Sprintf("%s", err) + clusterCondition.Type = apiv1alpha1.ConditionError + } else { + s.Status.State = apiv1alpha1.ClusterReadyState + clusterCondition.Type = apiv1alpha1.ConditionReady + } } if len(s.Status.Conditions) == 0 { @@ -163,9 +168,13 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition { // When the cluster is ready or closed, the number of replicas changes, // indicating that the cluster is updating nodes. if oldState == apiv1alpha1.ClusterReadyState || oldState == apiv1alpha1.ClusterCloseState { - if int(*s.Spec.Replicas) != s.Status.ReadyNodes { - clusterCondition.Type = apiv1alpha1.ConditionUpdate - s.Status.State = apiv1alpha1.ClusterUpdateState + if int(*s.Spec.Replicas) > s.Status.ReadyNodes { + clusterCondition.Type = apiv1alpha1.ConditionScaleOut + s.Status.State = apiv1alpha1.ClusterScaleOutState + return clusterCondition + } else if int(*s.Spec.Replicas) < s.Status.ReadyNodes { + clusterCondition.Type = apiv1alpha1.ConditionScaleIn + s.Status.State = apiv1alpha1.ClusterScaleInState return clusterCondition } } @@ -302,6 +311,53 @@ func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error return nil } +func (s *StatusSyncer) reconcileXenon(readyNodes int) error { + expectXenonNodes := s.getExpectXenonNodes(readyNodes) + for _, nodeStatus := range s.Status.Nodes { + toRemove := utils.StringDiffIn(nodeStatus.RaftStatus.Nodes, expectXenonNodes) + if err := s.removeNodesFromXenon(nodeStatus.Name, toRemove); err != nil { + return err + } + toAdd := utils.StringDiffIn(expectXenonNodes, nodeStatus.RaftStatus.Nodes) + if err := s.addNodesInXenon(nodeStatus.Name, toAdd); err != nil { + return err + } + } + return nil +} + +func (s *StatusSyncer) getExpectXenonNodes(readyNodes int) []string { + expectXenonNodes := []string{} + for i := 0; i < readyNodes; i++ { + expectXenonNodes = append(expectXenonNodes, fmt.Sprintf("%s:%d", s.GetPodHostName(i), utils.XenonPort)) + } + return expectXenonNodes +} + +func (s *StatusSyncer) removeNodesFromXenon(host string, toRemove []string) error { + if err := s.XenonExecutor.XenonPing(host); err != nil { + return err + } + for _, removeHost := range toRemove { + if err := s.XenonExecutor.ClusterRemove(host, removeHost); err != nil { + return err + } + } + return nil +} + +func (s *StatusSyncer) addNodesInXenon(host string, toAdd []string) error { + if err := s.XenonExecutor.XenonPing(host); err != nil { + return err + } + for _, addHost := range toAdd { + if err := s.XenonExecutor.ClusterAdd(host, addHost); err != nil { + return err + } + } + return nil +} + // setPodHealthy set the pod lable healthy. func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error { healthy := "no" diff --git a/utils/common.go b/utils/common.go index 1d41e9be..e34f352f 100644 --- a/utils/common.go +++ b/utils/common.go @@ -110,7 +110,6 @@ func StringDiffIn(actual, desired []string) []string { diff = append(diff, aStr) } } - return diff } @@ -120,7 +119,6 @@ func stringIn(str string, strs []string) (int, bool) { return i, true } } - return 0, false } @@ -133,6 +131,5 @@ func UnmarshalJSON(in io.Reader, obj interface{}) error { if err = json.Unmarshal(body, obj); err != nil { return fmt.Errorf("error unmarshal data, error: %s, body: %s", err, string(body)) } - return nil } diff --git a/utils/constants.go b/utils/constants.go index 30c804cc..75723ad3 100644 --- a/utils/constants.go +++ b/utils/constants.go @@ -41,6 +41,9 @@ var ( XenonHttpUrls = map[XenonHttpUrl]string{ RaftStatus: http.MethodGet, RaftTryToLeader: http.MethodPost, + XenonPing: http.MethodGet, + ClusterAdd: http.MethodPost, + ClusterRemove: http.MethodPost, } ) @@ -161,5 +164,8 @@ type XenonHttpUrl string const ( RaftStatus XenonHttpUrl = "/v1/raft/status" + XenonPing XenonHttpUrl = "/v1/xenon/ping" + ClusterAdd XenonHttpUrl = "/v1/cluster/add" + ClusterRemove XenonHttpUrl = "/v1/cluster/remove" RaftTryToLeader XenonHttpUrl = "/v1/raft/trytoleader" )