From d6cb55fc40af5eff2a9690397ab94e2cb95e9ef5 Mon Sep 17 00:00:00 2001 From: acekingke Date: Mon, 8 Nov 2021 16:58:44 +0800 Subject: [PATCH] mysqlcluster,sidecar: support Clone initial when add new pod, and fix bug after extend pvc, restore from backup fail #250, #370 --- mysqlcluster/container/init_sidecar.go | 5 +- mysqlcluster/container/init_sidecar_test.go | 4 + mysqlcluster/syncer/follower_service.go | 8 +- mysqlcluster/syncer/headless_service.go | 9 +- mysqlcluster/syncer/leader_service.go | 9 +- sidecar/config.go | 173 +++++++++++++++----- sidecar/init.go | 94 +++++++++-- sidecar/server.go | 76 ++++++++- sidecar/util.go | 3 + 9 files changed, 321 insertions(+), 60 deletions(-) diff --git a/mysqlcluster/container/init_sidecar.go b/mysqlcluster/container/init_sidecar.go index 4ad6b334..d5e316a3 100644 --- a/mysqlcluster/container/init_sidecar.go +++ b/mysqlcluster/container/init_sidecar.go @@ -94,7 +94,10 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar { Name: "RESTORE_FROM", Value: c.Spec.RestoreFrom, }, - + { + Name: "CLUSTER_NAME", + Value: c.Name, + }, getEnvVarFromSecret(sctName, "MYSQL_ROOT_PASSWORD", "root-password", false), getEnvVarFromSecret(sctName, "INTERNAL_ROOT_PASSWORD", "internal-root-password", true), getEnvVarFromSecret(sctName, "MYSQL_DATABASE", "mysql-database", true), diff --git a/mysqlcluster/container/init_sidecar_test.go b/mysqlcluster/container/init_sidecar_test.go index 4a44ebeb..5e09791f 100644 --- a/mysqlcluster/container/init_sidecar_test.go +++ b/mysqlcluster/container/init_sidecar_test.go @@ -107,6 +107,10 @@ var ( Name: "RESTORE_FROM", Value: "", }, + { + Name: "CLUSTER_NAME", + Value: "sample", + }, { Name: "MYSQL_ROOT_PASSWORD", ValueFrom: &corev1.EnvVarSource{ diff --git a/mysqlcluster/syncer/follower_service.go b/mysqlcluster/syncer/follower_service.go index f9686bcf..37a62f6f 100644 --- a/mysqlcluster/syncer/follower_service.go +++ b/mysqlcluster/syncer/follower_service.go @@ -51,13 +51,17 @@ func NewFollowerSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) synce service.Spec.Selector["role"] = "follower" service.Spec.Selector["healthy"] = "yes" - if len(service.Spec.Ports) != 1 { - service.Spec.Ports = make([]corev1.ServicePort, 1) + if len(service.Spec.Ports) != 2 { + service.Spec.Ports = make([]corev1.ServicePort, 2) } service.Spec.Ports[0].Name = utils.MysqlPortName service.Spec.Ports[0].Port = utils.MysqlPort service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort) + //xtrabckup + service.Spec.Ports[1].Name = utils.XBackupPortName + service.Spec.Ports[1].Port = utils.XBackupPort + service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort) return nil }) } diff --git a/mysqlcluster/syncer/headless_service.go b/mysqlcluster/syncer/headless_service.go index 97c75790..cad67409 100644 --- a/mysqlcluster/syncer/headless_service.go +++ b/mysqlcluster/syncer/headless_service.go @@ -56,14 +56,17 @@ func NewHeadlessSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) synce // Use `publishNotReadyAddresses` to be able to access pods even if the pod is not ready. service.Spec.PublishNotReadyAddresses = true - if len(service.Spec.Ports) != 1 { - service.Spec.Ports = make([]corev1.ServicePort, 1) + if len(service.Spec.Ports) != 2 { + service.Spec.Ports = make([]corev1.ServicePort, 2) } service.Spec.Ports[0].Name = utils.MysqlPortName service.Spec.Ports[0].Port = utils.MysqlPort service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort) - + //xtrabckup + service.Spec.Ports[1].Name = utils.XBackupPortName + service.Spec.Ports[1].Port = utils.XBackupPort + service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort) return nil }) } diff --git a/mysqlcluster/syncer/leader_service.go b/mysqlcluster/syncer/leader_service.go index eb777a76..1251e19c 100644 --- a/mysqlcluster/syncer/leader_service.go +++ b/mysqlcluster/syncer/leader_service.go @@ -50,13 +50,18 @@ func NewLeaderSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer. service.Spec.Selector = c.GetSelectorLabels() service.Spec.Selector["role"] = "leader" - if len(service.Spec.Ports) != 1 { - service.Spec.Ports = make([]corev1.ServicePort, 1) + if len(service.Spec.Ports) != 2 { + service.Spec.Ports = make([]corev1.ServicePort, 2) } service.Spec.Ports[0].Name = utils.MysqlPortName service.Spec.Ports[0].Port = utils.MysqlPort service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort) + + //xtrabckup + service.Spec.Ports[1].Name = utils.XBackupPortName + service.Spec.Ports[1].Port = utils.XBackupPort + service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort) return nil }) } diff --git a/sidecar/config.go b/sidecar/config.go index 3887f231..35dd4a0d 100644 --- a/sidecar/config.go +++ b/sidecar/config.go @@ -18,9 +18,12 @@ package sidecar import ( "fmt" + "io/ioutil" "os" "os/exec" + "path" "strconv" + "strings" "github.com/blang/semver" "github.com/go-ini/ini" @@ -116,6 +119,12 @@ type Config struct { // directory in S3 bucket for cluster restore from XRestoreFrom string + + // Clone flag + CloneFlag bool + + // GtidPurged is the gtid set of the slave cluster to purged. + GtidPurged string } // NewInitConfig returns a pointer to Config. @@ -183,6 +192,10 @@ func NewInitConfig() *Config { XCloudS3AccessKey: getEnvValue("S3_ACCESSKEY"), XCloudS3SecretKey: getEnvValue("S3_SECRETKEY"), XCloudS3Bucket: getEnvValue("S3_BUCKET"), + + ClusterName: getEnvValue("CLUSTER_NAME"), + CloneFlag: false, + GtidPurged: "", } } @@ -191,7 +204,7 @@ func NewBackupConfig() *Config { return &Config{ NameSpace: getEnvValue("NAMESPACE"), ServiceName: getEnvValue("SERVICE_NAME"), - ClusterName: getEnvValue("SERVICE_NAME"), + ClusterName: getEnvValue("CLUSTER_NAME"), RootPassword: getEnvValue("MYSQL_ROOT_PASSWORD"), BackupUser: getEnvValue("BACKUP_USER"), @@ -299,48 +312,51 @@ func (cfg *Config) buildXenonConf() []byte { hostName := fmt.Sprintf("%s.%s.%s", cfg.HostName, cfg.ServiceName, cfg.NameSpace) str := fmt.Sprintf(`{ - "log": { - "level": "INFO" - }, - "server": { - "endpoint": "%s:%d", - "peer-address": "%s:%d", - "enable-apis": true - }, - "replication": { - "passwd": "%s", - "user": "%s" - }, - "rpc": { - "request-timeout": %d - }, - "mysql": { - "admit-defeat-ping-count": 3, - "admin": "root", - "ping-timeout": %d, - "passwd": "%s", - "host": "localhost", - "version": "%s", - "master-sysvars": "%s", - "slave-sysvars": "%s", - "port": 3306, - "monitor-disabled": true - }, - "raft": { - "election-timeout": %d, - "admit-defeat-hearbeat-count": %d, - "heartbeat-timeout": %d, - "meta-datadir": "/var/lib/xenon/", - "leader-start-command": "/scripts/leader-start.sh", - "leader-stop-command": "/scripts/leader-stop.sh", - "semi-sync-degrade": true, - "purge-binlog-disabled": true, - "super-idle": false - } -} -`, hostName, utils.XenonPort, hostName, utils.XenonPeerPort, cfg.ReplicationPassword, cfg.ReplicationUser, requestTimeout, + "log": { + "level": "INFO" + }, + "server": { + "endpoint": "%s:%d", + "peer-address": "%s:%d", + "enable-apis": true + }, + "replication": { + "passwd": "%s", + "user": "%s", + "gtid-purged": "%s" + }, + "rpc": { + "request-timeout": %d + }, + "mysql": { + "admit-defeat-ping-count": 3, + "admin": "root", + "ping-timeout": %d, + "passwd": "%s", + "host": "localhost", + "version": "%s", + "master-sysvars": "%s", + "slave-sysvars": "%s", + "port": 3306, + "monitor-disabled": true + }, + "raft": { + "election-timeout": %d, + "admit-defeat-hearbeat-count": %d, + "heartbeat-timeout": %d, + "meta-datadir": "/var/lib/xenon/", + "leader-start-command": "/scripts/leader-start.sh", + "leader-stop-command": "/scripts/leader-stop.sh", + "semi-sync-degrade": true, + "purge-binlog-disabled": true, + "super-idle": false + } + } + `, hostName, utils.XenonPort, hostName, utils.XenonPeerPort, cfg.ReplicationPassword, cfg.ReplicationUser, + cfg.GtidPurged, requestTimeout, pingTimeout, cfg.RootPassword, version, srcSysVars, replicaSysVars, cfg.ElectionTimeout, cfg.AdmitDefeatHearbeatCount, heartbeatTimeout) + return utils.StringToBytes(str) } @@ -524,3 +540,78 @@ func (cfg *Config) executeS3Restore(path string) error { } return nil } + +// Do Restore after clone. +func (cfg *Config) executeCloneRestore() error { + // Check directory exist, create if not exist. + if _, err := os.Stat(utils.DataVolumeMountPath); os.IsNotExist(err) { + os.Mkdir(utils.DataVolumeMountPath, 0755) + } + + // Empty the directory. + dir, err := ioutil.ReadDir(utils.DataVolumeMountPath) + if err != nil { + return fmt.Errorf("failed to read datadir %s", err) + } + for _, d := range dir { + os.RemoveAll(path.Join([]string{utils.DataVolumeMountPath, d.Name()}...)) + } + // Xtrabackup prepare and apply-log-only. + cmd := exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--prepare", "--apply-log-only", "--target-dir=/backup/"+cfg.XRestoreFrom) + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to xtrabackup prepare apply-log-only : %s", err) + } + // Xtrabackup Prepare. + cmd = exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--prepare", "--target-dir=/backup/"+cfg.XRestoreFrom) + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to xtrabackup prepare : %s", err) + } + // Get the backup binlong info. + gtid, err := GetXtrabackupGTIDPurged("/backup/" + cfg.XRestoreFrom) + if err == nil { + cfg.GtidPurged = gtid + } + log.Info("get master gtid purged :", "gtid purged", cfg.GtidPurged) + // Xtrabackup copy-back. + cmd = exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--datadir="+utils.DataVolumeMountPath, "--copy-back", "--target-dir=/backup/"+cfg.XRestoreFrom) + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to xtrabackup copy-back : %s", err) + } + // Remove Relaybin. + // Because the relaybin is not used in the restore process, + // we can remove it to prevent it to be used by salve in the future. + cmd = exec.Command("rm", "-rf", utils.DataVolumeMountPath+"mysql-relay*") + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to remove relay-bin : %s", err) + } + // Run chown -R mysql.mysql /var/lib/mysql + cmd = exec.Command("chown", "-R", "mysql.mysql", utils.DataVolumeMountPath) + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to chown -R mysql.mysql : %s", err) + } + log.Info("execute clone restore success") + return nil +} + +// Parse the xtrabackup_binlog_info, the format is filename \t position \t gitid1 \ngitid2 ... +// or filename \t position\n +// Get the gtid when it is existed, or return empty string. +// It used to purged the gtid when start the mysqld slave. +func GetXtrabackupGTIDPurged(backuppath string) (string, error) { + byteStream, err := ioutil.ReadFile(fmt.Sprintf("%s/xtrabackup_binlog_info", backuppath)) + if err != nil { + return "", err + } + line := strings.TrimSuffix(string(byteStream), "\n") + ss := strings.Split(line, "\t") + if len(ss) != 3 { + return "", fmt.Errorf("info.file.content.invalid[%v]", string(byteStream)) + } + // Replace multi gtidset \n + return strings.Replace(ss[2], "\n", "", -1), nil +} diff --git a/sidecar/init.go b/sidecar/init.go index 014c3c3d..f798f718 100644 --- a/sidecar/init.go +++ b/sidecar/init.go @@ -19,12 +19,14 @@ package sidecar import ( "fmt" "io/ioutil" + "net/http" "os" "os/exec" "os/user" "path" "strconv" + "github.com/radondb/radondb-mysql-kubernetes/utils" "github.com/spf13/cobra" ) @@ -34,6 +36,9 @@ func NewInitCommand(cfg *Config) *cobra.Command { Use: "init", Short: "do some initialization operations.", Run: func(cmd *cobra.Command, args []string) { + if err := runCloneAndInit(cfg); err != nil { + log.Error(err, "clone error") + } if err := runInitCommand(cfg); err != nil { log.Error(err, "init command failed") os.Exit(1) @@ -44,6 +49,62 @@ func NewInitCommand(cfg *Config) *cobra.Command { return cmd } +// Check leader or follower backup status is ok. +func CheckServiceExist(cfg *Config, service string) bool { + serviceURL := fmt.Sprintf("http://%s-%s:%v%s", cfg.ClusterName, service, utils.XBackupPort, "/health") + req, err := http.NewRequest("GET", serviceURL, nil) + if err != nil { + log.Info("failed to check available service", "service", serviceURL, "error", err) + return false + } + + client := &http.Client{} + client.Transport = transportWithTimeout(serverConnectTimeout) + resp, err := client.Do(req) + if err != nil { + log.Info("service was not available", "service", serviceURL, "error", err) + return false + } + + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != 200 { + log.Info("service not available", "service", serviceURL, "HTTP status code", resp.StatusCode) + return false + } + + return true +} + +// Clone from leader or follower. +func runCloneAndInit(cfg *Config) error { + //check leader is exist? + serviceURL := "" + if len(serviceURL) == 0 && CheckServiceExist(cfg, "leader") { + serviceURL = fmt.Sprintf("http://%s-%s:%v", cfg.ClusterName, "leader", utils.XBackupPort) + } + //check follower is exists? + if len(serviceURL) == 0 && CheckServiceExist(cfg, "follower") { + serviceURL = fmt.Sprintf("http://%s-%s:%v", cfg.ClusterName, "follower", utils.XBackupPort) + } + if len(serviceURL) != 0 { + // backup at first + Args := fmt.Sprintf("rm -rf /backup/initbackup;mkdir -p /backup/initbackup;curl --user $BACKUP_USER:$BACKUP_PASSWORD %s/download|xbstream -x -C /backup/initbackup; exit ${PIPESTATUS[0]}", + serviceURL) + cmd := exec.Command("/bin/bash", "-c", "--", Args) + log.Info("runCloneAndInit", "cmd", Args) + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to disable the run restore: %s", err) + } + cfg.XRestoreFrom = backupInitDirectory + cfg.CloneFlag = true + return nil + } + log.Info("no leader or follower found") + return nil +} + // runInitCommand do some initialization operations. func runInitCommand(cfg *Config) error { var err error @@ -139,30 +200,43 @@ func runInitCommand(cfg *Config) error { } } + // run the restore. + // Check datadir is empty. + // if /var/lib/mysql/mysql is empty, then run the restore. + // otherwise , it must be has data, then do nothing. + if exists, _ := checkIfPathExists(dataPath + "/mysql"); !exists { + if len(cfg.XRestoreFrom) != 0 { + var err_f error + if cfg.CloneFlag { + err_f = cfg.executeCloneRestore() + if err_f != nil { + return fmt.Errorf("failed to execute Clone Restore : %s", err_f) + } + } else { + if err = cfg.executeS3Restore(cfg.XRestoreFrom); err != nil { + return fmt.Errorf("failed to restore from %s: %s", cfg.XRestoreFrom, err) + } + } + + } + } + // build xenon.json. xenonFilePath := path.Join(xenonPath, "xenon.json") if err = ioutil.WriteFile(xenonFilePath, cfg.buildXenonConf(), 0644); err != nil { return fmt.Errorf("failed to write xenon.json: %s", err) } - - // run the restore - if len(cfg.XRestoreFrom) != 0 { - if err = cfg.executeS3Restore(cfg.XRestoreFrom); err != nil { - return fmt.Errorf("failed to restore from %s: %s", cfg.XRestoreFrom, err) - } - } - log.Info("init command success") return nil } -/*start the backup http server*/ +// start the backup http server. func RunHttpServer(cfg *Config, stop <-chan struct{}) error { srv := newServer(cfg, stop) return srv.ListenAndServe() } -// request a backup command +// request a backup command. func RunRequestBackup(cfg *Config, host string) error { _, err := requestABackup(cfg, host, serverBackupEndpoint) return err diff --git a/sidecar/server.go b/sidecar/server.go index 806412a6..2f0bf762 100644 --- a/sidecar/server.go +++ b/sidecar/server.go @@ -19,8 +19,11 @@ package sidecar import ( "context" "fmt" + "io" "net" "net/http" + "os" + "os/exec" "strings" "time" @@ -32,6 +35,18 @@ const ( serverProbeEndpoint = "/health" serverBackupEndpoint = "/xbackup" serverConnectTimeout = 5 * time.Second + + // DownLoad server url. + serverBackupDownLoadEndpoint = "/download" + + // backupStatus http trailer + backupStatusTrailer = "X-Backup-Status" + + // success string + backupSuccessful = "Success" + + // failure string + backupFailed = "Failed" ) type server struct { @@ -53,7 +68,8 @@ func newServer(cfg *Config, stop <-chan struct{}) *server { // Add handle functions. mux.HandleFunc(serverProbeEndpoint, srv.healthHandler) mux.Handle(serverBackupEndpoint, maxClients(http.HandlerFunc(srv.backupHandler), 1)) - + mux.Handle(serverBackupDownLoadEndpoint, + maxClients(http.HandlerFunc(srv.backupDownLoadHandler), 1)) // Shutdown gracefully the http server. go func() { <-stop // wait for stop signal @@ -87,6 +103,64 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { } } +// DownLoad handler. +func (s *server) backupDownLoadHandler(w http.ResponseWriter, r *http.Request) { + + if !s.isAuthenticated(r) { + http.Error(w, "Not authenticated!", http.StatusForbidden) + return + } + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "HTTP server does not support streaming!", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Trailer", backupStatusTrailer) + + // nolint: gosec + xtrabackup := exec.Command(xtrabackupCommand, s.cfg.XtrabackupArgs()...) + xtrabackup.Stderr = os.Stderr + + stdout, err := xtrabackup.StdoutPipe() + if err != nil { + log.Error(err, "failed to create stdout pipe") + http.Error(w, "xtrabackup failed", http.StatusInternalServerError) + return + } + + defer func() { + // don't care + _ = stdout.Close() + }() + + if err := xtrabackup.Start(); err != nil { + log.Error(err, "failed to start xtrabackup command") + http.Error(w, "xtrabackup failed", http.StatusInternalServerError) + return + } + + if _, err := io.Copy(w, stdout); err != nil { + log.Error(err, "failed to copy buffer") + http.Error(w, "buffer copy failed", http.StatusInternalServerError) + return + } + + if err := xtrabackup.Wait(); err != nil { + log.Error(err, "failed waiting for xtrabackup to finish") + w.Header().Set(backupStatusTrailer, backupFailed) + http.Error(w, "xtrabackup failed", http.StatusInternalServerError) + return + } + + // success + w.Header().Set(backupStatusTrailer, backupSuccessful) + flusher.Flush() +} + func (s *server) isAuthenticated(r *http.Request) bool { user, pass, ok := r.BasicAuth() return ok && user == s.cfg.BackupUser && pass == s.cfg.BackupPassword diff --git a/sidecar/util.go b/sidecar/util.go index ddbc5794..d3ef01db 100644 --- a/sidecar/util.go +++ b/sidecar/util.go @@ -63,6 +63,9 @@ var ( // xcloudCommand is the upload tool file name. xcloudCommand = "xbcloud" + + // clone restore init data directory. + backupInitDirectory = "initbackup" ) // copyFile the src file to dst.