Skip to content

Commit

Permalink
*: Support make S3 and NFS backup schedule together radondb#689
Browse files Browse the repository at this point in the history
  • Loading branch information
acekingke committed Nov 7, 2022
1 parent e7a7c52 commit 2aaad51
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 17 deletions.
11 changes: 11 additions & 0 deletions api/v1alpha1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type MysqlClusterSpec struct {
// +optional
BackupSchedule string `json:"backupSchedule,omitempty"`

// Specify that crontab job backup both on NFS and S3 storage.
// +optional
BothS3NFS *BothS3NFSOpt `json:"bothS3NFS,omitempty"`

// If set keeps last BackupScheduleJobsHistoryLimit Backups
// +optional
// +kubebuilder:default:=6
Expand Down Expand Up @@ -288,6 +292,13 @@ type Persistence struct {
Size string `json:"size,omitempty"`
}

// bothS3NFS opt
type BothS3NFSOpt struct {
// NFS schedule.
NFSSchedule string `json:"nfsSchedule,omitempty"`
S3Schedule string `json:"s3Schedule,omitempty"`
}

// ClusterState defines cluster state.
type ClusterState string

Expand Down
20 changes: 20 additions & 0 deletions api/v1alpha1/mysqlcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (r *MysqlCluster) ValidateCreate() error {
if err := r.validateNFSServerAddress(r); err != nil {
return err
}

if err := r.validBothS3NFS(); err != nil {
return err
}

if err := r.validateMysqlVersionAndImage(); err != nil {
return err
}
Expand All @@ -74,6 +79,11 @@ func (r *MysqlCluster) ValidateUpdate(old runtime.Object) error {
if err := r.validateLowTableCase(oldCluster); err != nil {
return err
}

if err := r.validBothS3NFS(); err != nil {
return err
}

if err := r.validateMysqlVersionAndImage(); err != nil {
return err
}
Expand Down Expand Up @@ -145,3 +155,13 @@ func (r *MysqlCluster) validateMysqlVersionAndImage() error {
}
return nil
}

// Validate BothS3NFS
func (r *MysqlCluster) validBothS3NFS() error {
if r.Spec.BothS3NFS != nil &&
(len(r.Spec.NFSServerAddress) == 0 ||
len(r.Spec.BackupSecretName) == 0) {
return apierrors.NewForbidden(schema.GroupResource{}, "", fmt.Errorf("if BothS3NFS is set, backupSchedule/backupSecret/nfsAddress should not empty"))
}
return nil
}
20 changes: 20 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions backup/cronbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type CronJob struct {
BackupScheduleJobsHistoryLimit *int
Image string
NFSServerAddress string
Log logr.Logger
BackupType string

Log logr.Logger
}

func (j *CronJob) Run() {
Expand All @@ -45,7 +47,7 @@ func (j *CronJob) Run() {
log.Info("at least a backup is running", "running_backups_count", j.scheduledBackupsRunningCount())
return
}

//TODO: if BothS3NFS, it need create backup without nfs address.
// create the backup
if _, err := j.createBackup(); err != nil {
log.Error(err, "failed to create backup")
Expand Down Expand Up @@ -118,7 +120,7 @@ func (j *CronJob) backupGC() {
}

func (j *CronJob) createBackup() (*apiv1alpha1.Backup, error) {
backupName := fmt.Sprintf("%s-auto-%s", j.ClusterName, time.Now().Format("2006-01-02t15-04-05"))
backupName := fmt.Sprintf("%s-%s-%s", j.ClusterName, j.BackupType, time.Now().Format("2006-01-02t15-04-05"))

backup := &apiv1alpha1.Backup{
ObjectMeta: metav1.ObjectMeta{
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/mysql.radondb.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ spec:
description: Represents the name of the secret that contains credentials
to connect to the storage provider to store backups.
type: string
bothS3NFS:
description: Specify that crontab job backup both on NFS and S3 storage.
properties:
nfsSchedule:
description: NFS schedule.
type: string
s3Schedule:
type: string
type: object
metricsOpts:
default:
enabled: false
Expand Down
56 changes: 42 additions & 14 deletions controllers/backupcron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,39 +90,64 @@ func (r *BackupCronReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
// if spec.backupScheduler is not set then don't do anything
if len(instance.Spec.BackupSchedule) == 0 || *instance.Spec.Replicas == 0 {
if instance.Spec.BothS3NFS == nil &&
(len(instance.Spec.BackupSchedule) == 0 || *instance.Spec.Replicas == 0) {
if err := r.Cron.Remove(instance.Name); err == nil {
log.V(1).Info("remove cronjob from cluster", "name", instance.Name)
}

return reconcile.Result{}, nil
}
// do the bothS3NFS
if instance.Spec.BothS3NFS != nil {
froms := []struct {
Sche string
T string
}{
{instance.Spec.BothS3NFS.NFSSchedule, "nfs"},
{instance.Spec.BothS3NFS.S3Schedule, "s3"},
}

schedule, err := cron.Parse(instance.Spec.BackupSchedule)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to parse schedule: %s", err)
}
for _, f := range froms {
schestr, t := f.Sche, f.T
schedule, err := cron.Parse(schestr)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to parse schedule: %s", err)
}
if err := r.updateClusterSchedule(ctx, instance.Unwrap(), schedule, t, log); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
} else {
schedule, err := cron.Parse(instance.Spec.BackupSchedule)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to parse schedule: %s", err)
}

log.V(1).Info("register cluster in cronjob", "key", instance, "schedule", schedule)

log.V(1).Info("register cluster in cronjob", "key", instance, "schedule", schedule)
return ctrl.Result{}, r.updateClusterSchedule(ctx, instance.Unwrap(), schedule, "", log)
}

return ctrl.Result{}, r.updateClusterSchedule(ctx, instance.Unwrap(), schedule, log)
}

// updateClusterSchedule creates/updates a cron job for specified cluster.
func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluster *apiv1alpha1.MysqlCluster, schedule cron.Schedule, log logr.Logger) error {
func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluster *apiv1alpha1.MysqlCluster, schedule cron.Schedule, BackupType string, log logr.Logger) error {

r.LockJobRegister.Lock()
defer r.LockJobRegister.Unlock()

for _, entry := range r.Cron.Entries() {
j, ok := entry.Job.(*backup.CronJob)
if ok && j.ClusterName == cluster.Name && j.Namespace == cluster.Namespace {
if ok && j.ClusterName == cluster.Name &&
j.Namespace == cluster.Namespace && j.BackupType == BackupType {
log.V(1).Info("cluster already added to cron.", "key", cluster)

// change scheduler for already added crons
if !reflect.DeepEqual(entry.Schedule, schedule) {
log.Info("update cluster scheduler", "key", cluster,
"scheduler", cluster.Spec.BackupSchedule)
"scheduler", schedule)

if err := r.Cron.Remove(cluster.Name); err != nil {
return err
Expand All @@ -136,17 +161,20 @@ func (r *BackupCronReconciler) updateClusterSchedule(ctx context.Context, cluste
return nil
}
}
nfsServerAddress := ""
if BackupType == "nfs" {
nfsServerAddress = cluster.Spec.NFSServerAddress
}

r.Cron.Schedule(schedule, &backup.CronJob{
ClusterName: cluster.Name,
Namespace: cluster.Namespace,
Client: r.Client,
Image: cluster.Spec.PodPolicy.SidecarImage,
BackupScheduleJobsHistoryLimit: cluster.Spec.BackupScheduleJobsHistoryLimit,
//BackupRemoteDeletePolicy: cluster.Spec.BackupRemoteDeletePolicy,

NFSServerAddress: cluster.Spec.NFSServerAddress,
Log: log,
NFSServerAddress: nfsServerAddress,
BackupType: BackupType,
Log: log,
}, cluster.Name)

return nil
Expand Down

0 comments on commit 2aaad51

Please sign in to comment.