Skip to content

Commit

Permalink
Merge pull request radondb#291 from acekingke/cloneToInit
Browse files Browse the repository at this point in the history
mysqlcluster,sidecar: support Clone initial when add new pod, and fix   bug after extend pvc, restore from backup fail  radondb#250, radondb#370
  • Loading branch information
andyli029 authored Nov 29, 2021
2 parents 5968419 + d6cb55f commit 0a48820
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 60 deletions.
5 changes: 4 additions & 1 deletion mysqlcluster/container/init_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar {
Name: "RESTORE_FROM",
Value: c.Spec.RestoreFrom,
},

{
Name: "CLUSTER_NAME",
Value: c.Name,
},
getEnvVarFromSecret(sctName, "MYSQL_ROOT_PASSWORD", "root-password", false),
getEnvVarFromSecret(sctName, "INTERNAL_ROOT_PASSWORD", "internal-root-password", true),
getEnvVarFromSecret(sctName, "MYSQL_DATABASE", "mysql-database", true),
Expand Down
4 changes: 4 additions & 0 deletions mysqlcluster/container/init_sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ var (
Name: "RESTORE_FROM",
Value: "",
},
{
Name: "CLUSTER_NAME",
Value: "sample",
},
{
Name: "MYSQL_ROOT_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
Expand Down
8 changes: 6 additions & 2 deletions mysqlcluster/syncer/follower_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ func NewFollowerSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) synce
service.Spec.Selector["role"] = "follower"
service.Spec.Selector["healthy"] = "yes"

if len(service.Spec.Ports) != 1 {
service.Spec.Ports = make([]corev1.ServicePort, 1)
if len(service.Spec.Ports) != 2 {
service.Spec.Ports = make([]corev1.ServicePort, 2)
}

service.Spec.Ports[0].Name = utils.MysqlPortName
service.Spec.Ports[0].Port = utils.MysqlPort
service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort)
//xtrabckup
service.Spec.Ports[1].Name = utils.XBackupPortName
service.Spec.Ports[1].Port = utils.XBackupPort
service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort)
return nil
})
}
9 changes: 6 additions & 3 deletions mysqlcluster/syncer/headless_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ func NewHeadlessSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) synce
// Use `publishNotReadyAddresses` to be able to access pods even if the pod is not ready.
service.Spec.PublishNotReadyAddresses = true

if len(service.Spec.Ports) != 1 {
service.Spec.Ports = make([]corev1.ServicePort, 1)
if len(service.Spec.Ports) != 2 {
service.Spec.Ports = make([]corev1.ServicePort, 2)
}

service.Spec.Ports[0].Name = utils.MysqlPortName
service.Spec.Ports[0].Port = utils.MysqlPort
service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort)

//xtrabckup
service.Spec.Ports[1].Name = utils.XBackupPortName
service.Spec.Ports[1].Port = utils.XBackupPort
service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort)
return nil
})
}
9 changes: 7 additions & 2 deletions mysqlcluster/syncer/leader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ func NewLeaderSVCSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer.
service.Spec.Selector = c.GetSelectorLabels()
service.Spec.Selector["role"] = "leader"

if len(service.Spec.Ports) != 1 {
service.Spec.Ports = make([]corev1.ServicePort, 1)
if len(service.Spec.Ports) != 2 {
service.Spec.Ports = make([]corev1.ServicePort, 2)
}

service.Spec.Ports[0].Name = utils.MysqlPortName
service.Spec.Ports[0].Port = utils.MysqlPort
service.Spec.Ports[0].TargetPort = intstr.FromInt(utils.MysqlPort)

//xtrabckup
service.Spec.Ports[1].Name = utils.XBackupPortName
service.Spec.Ports[1].Port = utils.XBackupPort
service.Spec.Ports[1].TargetPort = intstr.FromInt(utils.XBackupPort)
return nil
})
}
173 changes: 132 additions & 41 deletions sidecar/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package sidecar

import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"strconv"
"strings"

"github.com/blang/semver"
"github.com/go-ini/ini"
Expand Down Expand Up @@ -116,6 +119,12 @@ type Config struct {

// directory in S3 bucket for cluster restore from
XRestoreFrom string

// Clone flag
CloneFlag bool

// GtidPurged is the gtid set of the slave cluster to purged.
GtidPurged string
}

// NewInitConfig returns a pointer to Config.
Expand Down Expand Up @@ -183,6 +192,10 @@ func NewInitConfig() *Config {
XCloudS3AccessKey: getEnvValue("S3_ACCESSKEY"),
XCloudS3SecretKey: getEnvValue("S3_SECRETKEY"),
XCloudS3Bucket: getEnvValue("S3_BUCKET"),

ClusterName: getEnvValue("CLUSTER_NAME"),
CloneFlag: false,
GtidPurged: "",
}
}

Expand All @@ -191,7 +204,7 @@ func NewBackupConfig() *Config {
return &Config{
NameSpace: getEnvValue("NAMESPACE"),
ServiceName: getEnvValue("SERVICE_NAME"),
ClusterName: getEnvValue("SERVICE_NAME"),
ClusterName: getEnvValue("CLUSTER_NAME"),
RootPassword: getEnvValue("MYSQL_ROOT_PASSWORD"),

BackupUser: getEnvValue("BACKUP_USER"),
Expand Down Expand Up @@ -299,48 +312,51 @@ func (cfg *Config) buildXenonConf() []byte {
hostName := fmt.Sprintf("%s.%s.%s", cfg.HostName, cfg.ServiceName, cfg.NameSpace)

str := fmt.Sprintf(`{
"log": {
"level": "INFO"
},
"server": {
"endpoint": "%s:%d",
"peer-address": "%s:%d",
"enable-apis": true
},
"replication": {
"passwd": "%s",
"user": "%s"
},
"rpc": {
"request-timeout": %d
},
"mysql": {
"admit-defeat-ping-count": 3,
"admin": "root",
"ping-timeout": %d,
"passwd": "%s",
"host": "localhost",
"version": "%s",
"master-sysvars": "%s",
"slave-sysvars": "%s",
"port": 3306,
"monitor-disabled": true
},
"raft": {
"election-timeout": %d,
"admit-defeat-hearbeat-count": %d,
"heartbeat-timeout": %d,
"meta-datadir": "/var/lib/xenon/",
"leader-start-command": "/scripts/leader-start.sh",
"leader-stop-command": "/scripts/leader-stop.sh",
"semi-sync-degrade": true,
"purge-binlog-disabled": true,
"super-idle": false
}
}
`, hostName, utils.XenonPort, hostName, utils.XenonPeerPort, cfg.ReplicationPassword, cfg.ReplicationUser, requestTimeout,
"log": {
"level": "INFO"
},
"server": {
"endpoint": "%s:%d",
"peer-address": "%s:%d",
"enable-apis": true
},
"replication": {
"passwd": "%s",
"user": "%s",
"gtid-purged": "%s"
},
"rpc": {
"request-timeout": %d
},
"mysql": {
"admit-defeat-ping-count": 3,
"admin": "root",
"ping-timeout": %d,
"passwd": "%s",
"host": "localhost",
"version": "%s",
"master-sysvars": "%s",
"slave-sysvars": "%s",
"port": 3306,
"monitor-disabled": true
},
"raft": {
"election-timeout": %d,
"admit-defeat-hearbeat-count": %d,
"heartbeat-timeout": %d,
"meta-datadir": "/var/lib/xenon/",
"leader-start-command": "/scripts/leader-start.sh",
"leader-stop-command": "/scripts/leader-stop.sh",
"semi-sync-degrade": true,
"purge-binlog-disabled": true,
"super-idle": false
}
}
`, hostName, utils.XenonPort, hostName, utils.XenonPeerPort, cfg.ReplicationPassword, cfg.ReplicationUser,
cfg.GtidPurged, requestTimeout,
pingTimeout, cfg.RootPassword, version, srcSysVars, replicaSysVars, cfg.ElectionTimeout,
cfg.AdmitDefeatHearbeatCount, heartbeatTimeout)

return utils.StringToBytes(str)
}

Expand Down Expand Up @@ -524,3 +540,78 @@ func (cfg *Config) executeS3Restore(path string) error {
}
return nil
}

// Do Restore after clone.
func (cfg *Config) executeCloneRestore() error {
// Check directory exist, create if not exist.
if _, err := os.Stat(utils.DataVolumeMountPath); os.IsNotExist(err) {
os.Mkdir(utils.DataVolumeMountPath, 0755)
}

// Empty the directory.
dir, err := ioutil.ReadDir(utils.DataVolumeMountPath)
if err != nil {
return fmt.Errorf("failed to read datadir %s", err)
}
for _, d := range dir {
os.RemoveAll(path.Join([]string{utils.DataVolumeMountPath, d.Name()}...))
}
// Xtrabackup prepare and apply-log-only.
cmd := exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--prepare", "--apply-log-only", "--target-dir=/backup/"+cfg.XRestoreFrom)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to xtrabackup prepare apply-log-only : %s", err)
}
// Xtrabackup Prepare.
cmd = exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--prepare", "--target-dir=/backup/"+cfg.XRestoreFrom)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to xtrabackup prepare : %s", err)
}
// Get the backup binlong info.
gtid, err := GetXtrabackupGTIDPurged("/backup/" + cfg.XRestoreFrom)
if err == nil {
cfg.GtidPurged = gtid
}
log.Info("get master gtid purged :", "gtid purged", cfg.GtidPurged)
// Xtrabackup copy-back.
cmd = exec.Command(xtrabackupCommand, "--defaults-file="+utils.ConfVolumeMountPath+"/my.cnf", "--use-memory=3072M", "--datadir="+utils.DataVolumeMountPath, "--copy-back", "--target-dir=/backup/"+cfg.XRestoreFrom)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to xtrabackup copy-back : %s", err)
}
// Remove Relaybin.
// Because the relaybin is not used in the restore process,
// we can remove it to prevent it to be used by salve in the future.
cmd = exec.Command("rm", "-rf", utils.DataVolumeMountPath+"mysql-relay*")
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to remove relay-bin : %s", err)
}
// Run chown -R mysql.mysql /var/lib/mysql
cmd = exec.Command("chown", "-R", "mysql.mysql", utils.DataVolumeMountPath)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to chown -R mysql.mysql : %s", err)
}
log.Info("execute clone restore success")
return nil
}

// Parse the xtrabackup_binlog_info, the format is filename \t position \t gitid1 \ngitid2 ...
// or filename \t position\n
// Get the gtid when it is existed, or return empty string.
// It used to purged the gtid when start the mysqld slave.
func GetXtrabackupGTIDPurged(backuppath string) (string, error) {
byteStream, err := ioutil.ReadFile(fmt.Sprintf("%s/xtrabackup_binlog_info", backuppath))
if err != nil {
return "", err
}
line := strings.TrimSuffix(string(byteStream), "\n")
ss := strings.Split(line, "\t")
if len(ss) != 3 {
return "", fmt.Errorf("info.file.content.invalid[%v]", string(byteStream))
}
// Replace multi gtidset \n
return strings.Replace(ss[2], "\n", "", -1), nil
}
Loading

0 comments on commit 0a48820

Please sign in to comment.