Skip to content

Commit

Permalink
*: add backup gtid and fix some bugs radondb#698
Browse files Browse the repository at this point in the history
  • Loading branch information
acekingke committed Sep 19, 2022
1 parent daa8ce7 commit 5b6aa23
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 21 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type BackupStatus struct {
BackupDate string `json:"backupDate,omitempty"`
// Get the backup Type
BackupType string `json:"backupType,omitempty"`
// Get the Gtid
Gtid string `json:"gtid,omitempty"`
// Conditions represents the backup resource conditions list.
Conditions []BackupCondition `json:"conditions,omitempty"`
}
Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ type MysqlClusterStatus struct {
ReadyNodes int `json:"readyNodes,omitempty"`
// State
State ClusterState `json:"state,omitempty"`
// LastBackup
LastBackup string `json:"lastbackup,omitempty"`
LastBackupGtid string `json:"lastbackupGtid,omitempty"`
// Conditions contains the list of the cluster conditions fulfilled.
Conditions []ClusterCondition `json:"conditions,omitempty"`
// Nodes contains the list of the node status fulfilled.
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/mysqlcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *MysqlCluster) ValidateDelete() error {
return nil
}

// TODO: Add NFSServerAddress webhook & backup schedule.
// Add NFSServerAddress webhook & backup schedule.
func (r *MysqlCluster) validateNFSServerAddress(oldCluster *MysqlCluster) error {
isIP := net.ParseIP(r.Spec.NFSServerAddress) != nil
if len(r.Spec.NFSServerAddress) != 0 && !isIP {
Expand Down
18 changes: 14 additions & 4 deletions backup/cronbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,22 @@ func (j *CronJob) scheduledBackupsRunningCount() int {
backupsList := &apiv1alpha1.BackupList{}
// select all backups with labels recurrent=true and and not completed of the cluster
selector := j.backupSelector()
client.MatchingFields{"status.completed": "false"}.ApplyToList(selector)
// Because k8s do not support fieldSelector with custom resources
// https://github.com/kubernetes/kubernetes/issues/51046
// So this cannot use fields selector.
// client.MatchingFields{"status.completed": "false"}.ApplyToList(selector)

if err := j.Client.List(context.TODO(), backupsList, selector); err != nil {
if err := j.Client.List(context.TODO(), backupsList); err != nil {
log.Error(err, "failed getting backups", "selector", selector)
return 0
}

return len(backupsList.Items)
var rest []apiv1alpha1.Backup
for _, b := range backupsList.Items {
if !b.Status.Completed {
rest = append(rest, b)
}
}
return len(rest)
}

func (j *CronJob) backupSelector() *client.ListOptions {
Expand Down Expand Up @@ -133,10 +141,12 @@ func (j *CronJob) createBackup() (*apiv1alpha1.Backup, error) {
//RemoteDeletePolicy: j.BackupRemoteDeletePolicy,
HostName: fmt.Sprintf("%s-mysql-0", j.ClusterName),
},
Status: apiv1alpha1.BackupStatus{Completed: false},
}
if len(j.NFSServerAddress) > 0 {
backup.Spec.NFSServerAddress = j.NFSServerAddress
}

return backup, j.Client.Create(context.TODO(), backup)
}

Expand Down
12 changes: 9 additions & 3 deletions backup/syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

v1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/backup"
Expand Down Expand Up @@ -101,6 +102,9 @@ func (s *jobSyncer) updateStatus(job *batchv1.Job) {
if backType := s.job.Annotations[utils.JobAnonationType]; backType != "" {
s.backup.Status.BackupType = backType
}
if gtid := s.job.Annotations[utils.JobAnonationGtid]; gtid != "" {
s.backup.Status.Gtid = gtid
}
}

// check for failed condition
Expand Down Expand Up @@ -152,13 +156,15 @@ func (s *jobSyncer) ensurePodSpec(in corev1.PodSpec) corev1.PodSpec {
"/bin/bash", "-c", "--",
}
backupToDir, DateTime := utils.BuildBackupName(s.backup.Spec.ClusterName)
// add the gtid script
strAnnonations := fmt.Sprintf(`curl -X PATCH -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" -H "Content-Type: application/json-patch+json" \
--cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/apis/batch/v1/namespaces/%s/jobs/%s \
-d '[{"op": "add", "path": "/metadata/annotations/backupName", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupDate", "value": "%s"}, {"op": "add", "path": "/metadata/annotations/backupType", "value": "NFS"}]';`,
s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime)
-d "[{\"op\": \"add\", \"path\": \"/metadata/annotations/backupName\", \"value\": \"%s\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupDate\", \"value\": \"%s\"},{\"op\": \"add\", \"path\": \"/metadata/annotations/gtid\", \"value\": \"$( cat /backup/%s/xtrabackup_binlog_info|awk '{print $3}')\"}, {\"op\": \"add\", \"path\": \"/metadata/annotations/backupType\", \"value\": \"NFS\"}]";`,
s.backup.Namespace, s.backup.GetNameForJob(), backupToDir, DateTime, backupToDir)
log.Log.Info(strAnnonations)
// Add the check DiskUsage
// use expr because shell cannot compare float number
checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(du /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')) -eq '1' ] || { echo disk available may be too small; exit 1;};`
checkUsage := `[ $(expr $(df /backup|awk 'NR>1 {print $4}') \> $(echo $(du -d1 /backup |awk 'END {if (NR > 1) {print $1 /(NR-1)} else print 0}')|cut -d. -f1)) -eq '1' ] || { echo disk available may be too small; exit 1;};`
in.Containers[0].Args = []string{
checkUsage + fmt.Sprintf("mkdir -p /backup/%s;"+
"curl --user $BACKUP_USER:$BACKUP_PASSWORD %s/download|xbstream -x -C /backup/%s; err1=${PIPESTATUS[0]};"+
Expand Down
3 changes: 3 additions & 0 deletions charts/mysql-operator/crds/mysql.radondb.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- type
type: object
type: array
gtid:
description: Get the Gtid
type: string
required:
- completed
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,11 @@ spec:
- type
type: object
type: array
lastbackup:
description: LastBackup
type: string
lastbackupGtid:
type: string
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/mysql.radondb.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- type
type: object
type: array
gtid:
description: Get the Gtid
type: string
required:
- completed
type: object
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/mysql.radondb.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,11 @@ spec:
- type
type: object
type: array
lastbackup:
description: LastBackup
type: string
lastbackupGtid:
type: string
nodes:
description: Nodes contains the list of the node status fulfilled.
items:
Expand Down
2 changes: 1 addition & 1 deletion controllers/backupcron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluste
log.V(1).Info("cluster already added to cron.", "key", cluster)

// change scheduler for already added crons
if !reflect.DeepEqual(entry.Schedule, schedule) {
if !reflect.DeepEqual(entry.Schedule, schedule) || j.NFSServerAddress != cluster.Spec.NFSServerAddress {
log.Info("update cluster scheduler", "key", cluster,
"scheduler", cluster.Spec.BackupSchedule)

Expand Down
37 changes: 36 additions & 1 deletion mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package syncer
import (
"context"
"fmt"
"sort"
"strconv"
"time"

"github.com/presslabs/controller-util/pkg/syncer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -153,11 +155,44 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
if len(s.Status.Conditions) > maxStatusesQuantity {
s.Status.Conditions = s.Status.Conditions[len(s.Status.Conditions)-maxStatusesQuantity:]
}

// update backup Status
s.updateLastBackup()
// Update all nodes' status.
return syncer.SyncResult{}, s.updateNodeStatus(ctx, s.cli, list.Items)
}

func (s *StatusSyncer) updateLastBackup() error {
// 1. fetch all finished backup cr
backupsList := &apiv1alpha1.BackupList{}
labelSet := labels.Set{"cluster": s.Name}
if err := s.cli.List(context.TODO(), backupsList, &client.ListOptions{
Namespace: s.Namespace, LabelSelector: labelSet.AsSelector(),
}); err != nil {
return err
}
var finisheds []apiv1alpha1.Backup
for _, b := range backupsList.Items {
if b.Status.Completed {
finisheds = append(finisheds, b)
}
}
// 2. sort descent
sort.Slice(finisheds, func(i, j int) bool {
return finisheds[i].ObjectMeta.CreationTimestamp.Before(&finisheds[j].ObjectMeta.CreationTimestamp)
})
// 3. get first backup which has backup Name
for _, b := range finisheds {
if len(b.Status.BackupName) != 0 {
s.Status.LastBackup = b.Status.BackupName
s.Status.LastBackupGtid = b.Status.Gtid
break
}

}

return nil
}

// updateClusterStatus update the cluster status and returns condition.
func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {
clusterCondition := apiv1alpha1.ClusterCondition{
Expand Down
9 changes: 5 additions & 4 deletions sidecar/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not authenticated!", http.StatusForbidden)
return
}
backName, Datetime, err := RunTakeBackupCommand(s.cfg)
backName, Datetime, Gtid, err := RunTakeBackupCommand(s.cfg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Date: Datetime})
msg, _ := json.Marshal(utils.JsonResult{Status: backupSuccessful, BackupName: backName, Gtid: Gtid, Date: Datetime})
w.Write(msg)
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func transportWithTimeout(connectTimeout time.Duration) http.RoundTripper {
}
}

func setAnnonations(cfg *Config, backname string, DateTime string, BackupType string) error {
func setAnnonations(cfg *Config, backname string, DateTime string, Gtid, BackupType string) error {
config, err := rest.InClusterConfig()
if err != nil {
return err
Expand All @@ -231,6 +231,7 @@ func setAnnonations(cfg *Config, backname string, DateTime string, BackupType st
}
job.Annotations[utils.JobAnonationName] = backname
job.Annotations[utils.JobAnonationDate] = DateTime
job.Annotations[utils.JobAnonationGtid] = Gtid
job.Annotations[utils.JobAnonationType] = BackupType
_, err = clientset.BatchV1().Jobs(cfg.NameSpace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
Expand Down Expand Up @@ -266,7 +267,7 @@ func requestABackup(cfg *Config, host string, endpoint string) (*http.Response,
var result utils.JsonResult
json.NewDecoder(resp.Body).Decode(&result)

err = setAnnonations(cfg, result.BackupName, result.Date, "S3") // set annotation
err = setAnnonations(cfg, result.BackupName, result.Date, result.Gtid, "S3") // set annotation
if err != nil {
return nil, fmt.Errorf("fail to set annotation: %s", err)
}
Expand Down
51 changes: 44 additions & 7 deletions sidecar/takebackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,66 @@ limitations under the License.
package sidecar

import (
"bufio"
"fmt"
"os"
"os/exec"
"strings"
"sync"
)

// RunTakeBackupCommand starts a backup command
func RunTakeBackupCommand(cfg *Config) (string, string, error) {
func RunTakeBackupCommand(cfg *Config) (string, string, string, error) {
// cfg->XtrabackupArgs()
xtrabackup := exec.Command(xtrabackupCommand, cfg.XtrabackupArgs()...)

var err error
backupName, DateTime := cfg.XBackupName()
Gtid := ""
xcloud := exec.Command(xcloudCommand, cfg.XCloudArgs(backupName)...)
log.Info("xargs ", "xargs", strings.Join(cfg.XCloudArgs(backupName), " "))
if xcloud.Stdin, err = xtrabackup.StdoutPipe(); err != nil {
log.Error(err, "failed to pipline")
return "", "", err
return "", "", "", err
}
xtrabackup.Stderr = os.Stderr
//xtrabackup.Stderr = os.Stderr
xcloud.Stderr = os.Stderr

var wg sync.WaitGroup
Stderr, err := xtrabackup.StderrPipe()
if err != nil {
return "", "", "", fmt.Errorf("RunCommand: cmd.StderrPipe(): %v", err)
}
if err := xtrabackup.Start(); err != nil {
log.Error(err, "failed to start xtrabackup command")
return "", "", err
return "", "", "", err
}
if err := xcloud.Start(); err != nil {
log.Error(err, "fail start xcloud ")
return "", "", err
return "", "", "", err
}
scanner := bufio.NewScanner(Stderr)
//scanner.Split(ScanLinesR)
wg.Add(1)
go func() {
for scanner.Scan() {
text := scanner.Text()
fmt.Println(text)
if index := strings.Index(text, "GTID"); index != -1 {
// Mysql5.7 examples: MySQL binlog position: filename 'mysql-bin.000002', position '588', GTID of the last change '319bd6eb-2ea2-11ed-bf40-7e1ef582b427:1-2'
// MySQL8.0 no gtid: MySQL binlog position: filename 'mysql-bin.000025', position '156'
length := len("GTID of the last change")
Gtid = strings.Trim(text[index+length:], " '") // trim space and \'
if len(Gtid) != 0 {
log.Info("Catch gtid: " + Gtid)
}

}
}
wg.Done()
}()

wg.Wait()
// pipe command fail one, whole things fail
errorChannel := make(chan error, 2)
go func() {
Expand All @@ -55,11 +85,18 @@ func RunTakeBackupCommand(cfg *Config) (string, string, error) {
go func() {
errorChannel <- xtrabackup.Wait()
}()
defer xtrabackup.Wait()
defer xcloud.Wait()

for i := 0; i < 2; i++ {
if err = <-errorChannel; err != nil {
return "", "", err
log.Info("catch error , need to stop")
_ = xtrabackup.Process.Kill()
_ = xcloud.Process.Kill()

return "", "", "", err
}
}
return backupName, DateTime, nil

return backupName, DateTime, Gtid, nil
}
3 changes: 3 additions & 0 deletions utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ const (
JobAnonationName = "backupName"
// Job Annonations date
JobAnonationDate = "backupDate"
// Job Anonations Gtid
JobAnonationGtid = "gtid"
// Job Annonations type
JobAnonationType = "backupType"
)
Expand Down Expand Up @@ -211,5 +213,6 @@ const (
type JsonResult struct {
Status string `json:"status"`
BackupName string `json:"backupName"`
Gtid string `json:"gtid"`
Date string `json:"date"`
}

0 comments on commit 5b6aa23

Please sign in to comment.