Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New crd for log compact #5822

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d

var errs []error

backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand Down Expand Up @@ -506,7 +506,7 @@ func (bm *Manager) performLogBackup(ctx context.Context, backup *v1alpha1.Backup
// startLogBackup starts log backup.
func (bm *Manager) startLogBackup(ctx context.Context, backup *v1alpha1.Backup) (*controller.BackupUpdateStatus, string, error) {
started := time.Now()
backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
klog.Errorf("Get backup full path of cluster %s failed, err: %s", bm, err)
return nil, "GetBackupRemotePathFailed", err
Expand Down
1 change: 1 addition & 0 deletions cmd/backup-manager/app/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewBackupMgrCommand() *cobra.Command {
cmds.AddCommand(NewRestoreCommand())
cmds.AddCommand(NewImportCommand())
cmds.AddCommand(NewCleanCommand())
cmds.AddCommand(NewCompactCommand())
return cmds
}

Expand Down
47 changes: 47 additions & 0 deletions cmd/backup-manager/app/cmd/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact"
coptions "github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
"github.com/spf13/cobra"
)

func NewCompactCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "compact",
Short: "Compact log backup.",
RunE: func(cmd *cobra.Command, args []string) error {
opts := coptions.KubeOpts{}
if err := opts.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
opts.Kubeconfig = kubecfg

ctx := context.Background()
link, err := compact.NewKubelink(opts.Kubeconfig)
if err != nil {
return err
}
cx := compact.New(opts, link)
return cx.Run(ctx)
},
}

coptions.DefineFlags(cmd.Flags())
return cmd
}
147 changes: 147 additions & 0 deletions cmd/backup-manager/app/compact/kubelink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package compact

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/apis/util/config"
pkgutil "github.com/pingcap/tidb-operator/pkg/backup/util"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)

type Kubelink struct {
kube kubernetes.Interface
cr versioned.Interface

ref *CompactionRef
recorder record.EventRecorder
}

func NewKubelink(kubeconfig string) (*Kubelink, error) {
kube, cr, err := util.NewKubeAndCRCli(kubeconfig)
if err != nil {
return nil, err
}
return &Kubelink{
kube: kube,
cr: cr,
}, nil
}

func (k *Kubelink) GetCompaction(ctx context.Context, opts CompactionRef) (options.CompactOpts, error) {
if k.ref != nil {
return options.CompactOpts{}, errors.New("GetCompaction called twice")
}
k.ref = &opts
k.recorder = util.NewEventRecorder(k.kube, "compact.Kubelink")

cb, err := k.cr.PingcapV1alpha1().
CompactBackups(opts.Namespace).
Get(ctx, opts.Name, v1.GetOptions{})
if err != nil {
return options.CompactOpts{}, err
}

out := options.CompactOpts{}
args, err := pkgutil.GenStorageArgsForFlag(cb.Spec.StorageProvider, "")
if err != nil {
return options.CompactOpts{}, err
}
out.StorageOpts = args

startTs, err := config.ParseTSString(cb.Spec.StartTs)
if err != nil {
return options.CompactOpts{}, errors.Annotatef(err, "failed to parse startTs %s", cb.Spec.StartTs)
}
endTs, err := config.ParseTSString(cb.Spec.EndTs)
if err != nil {
return options.CompactOpts{}, errors.Annotatef(err, "failed to parse endTs %s", cb.Spec.EndTs)
}
out.FromTS = startTs
out.UntilTS = endTs

out.Name = cb.ObjectMeta.Name
out.Concurrency = uint64(cb.Spec.Concurrency)

if err := out.Verify(); err != nil {
return options.CompactOpts{}, err
}

return out, nil
}

type cOP func(*v1alpha1.CompactBackup) error

func (k *Kubelink) setState(newState string) cOP {
return func(cb *v1alpha1.CompactBackup) error {
cb.Status.State = newState
return nil
}
}

func (k *Kubelink) event(ty, reason, msg string) cOP {
return func(cb *v1alpha1.CompactBackup) error {
k.recorder.Event(cb, ty, reason, msg)
return nil
}
}

func (k *Kubelink) edit(ctx context.Context, extraOps ...cOP) error {
lister := k.cr.PingcapV1alpha1().
CompactBackups(k.ref.Namespace)
cb, err := lister.
Get(ctx, k.ref.Name, v1.GetOptions{})
if err != nil {
return err
}

for _, op := range extraOps {
if err := op(cb); err != nil {
return err
}
}

_, err = lister.Update(ctx, cb, v1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

func (k *Kubelink) editOrWarn(ctx context.Context, extraOps ...cOP) {
if err := k.edit(ctx, extraOps...); err != nil {
klog.Warningf(
"failed to edit state for %s/%s: %v",
k.ref.Namespace,
k.ref.Name,
err,
)
}
}

func (k *Kubelink) OnStart(ctx context.Context) {
k.editOrWarn(ctx, k.setState("STARTED"), k.event(corev1.EventTypeNormal, "Started", "CompactionStarted"))
}

func (k *Kubelink) OnProgress(ctx context.Context, p Progress) {
message := fmt.Sprintf("RUNNING[READ_META(%d/%d),COMPACT_WORK(%d/%d)]",
p.MetaCompleted, p.MetaTotal, p.BytesCompacted, p.BytesToCompact)
k.editOrWarn(ctx, k.setState(message))
}

func (k *Kubelink) OnFinish(ctx context.Context, err error) {
if err != nil {
k.editOrWarn(ctx, k.setState(fmt.Sprintf("ERR[%s]", err)), k.event(corev1.EventTypeWarning, "Failed", err.Error()))
} else {
k.editOrWarn(ctx, k.setState("DONE"), k.event(corev1.EventTypeNormal, "Succeeded", "CompactionDone"))
}
}
79 changes: 79 additions & 0 deletions cmd/backup-manager/app/compact/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package options

import (
"math"

"github.com/pingcap/errors"

"github.com/spf13/pflag"
)

const (
fromTSUnset = math.MaxUint64
untilTSUnset = 0

namespaceFlag = "namespace"
resourceNameFlag = "resourceName"
tiKVVersionFlag = "tikvVersion"
storageStringFlag = "storage-string"
fromTSFlag = "from-ts"
untilTSFlag = "until-ts"
nameFlag = "name"
concurrencyFlag = "concurrency"
)

type KubeOpts struct {
// This should be fill by the caller.
Kubeconfig string `json:"-"`
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
TiKVVersion string `json:"tikvVersion"`
}

type CompactOpts struct {
FromTS uint64
UntilTS uint64
Name string
Concurrency uint64
StorageOpts []string
}

func DefineFlags(fs *pflag.FlagSet) {
fs.String(tiKVVersionFlag, "", "TiKV version of the resource")
fs.String(namespaceFlag, "", "Namespace of the resource")
fs.String(resourceNameFlag, "", "Name of the resource")
}

func (k *KubeOpts) ParseFromFlags(fs *pflag.FlagSet) error {
var err error
k.Namespace, err = fs.GetString(namespaceFlag)
if err != nil {
return errors.Trace(err)
}
k.ResourceName, err = fs.GetString(resourceNameFlag)
if err != nil {
return errors.Trace(err)
}
k.TiKVVersion, err = fs.GetString(tiKVVersionFlag)
if err != nil {
return errors.Trace(err)
}

return nil
}

func (c *CompactOpts) Verify() error {
if c.UntilTS < c.FromTS {
if c.UntilTS == untilTSUnset {
return errors.New("until-ts must be set")
}
if c.FromTS == fromTSUnset {
return errors.New("from-ts must be set")
}
return errors.Errorf("until-ts %d must be greater than from-ts %d", c.UntilTS, c.FromTS)
}
if c.Concurrency <= 0 {
return errors.Errorf("concurrency %d must be greater than 0", c.Concurrency)
}
return nil
}
Loading
Loading