Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove snapshot client from backup controller #5299

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/5299-jxun
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove snapshot related lister, informer and client from backup controller.
2 changes: 1 addition & 1 deletion pkg/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Request struct {
VolumeSnapshots []*volume.Snapshot
PodVolumeBackups []*velerov1api.PodVolumeBackup
BackedUpItems map[itemKey]struct{}
CSISnapshots []*snapshotv1api.VolumeSnapshot
CSISnapshots []snapshotv1api.VolumeSnapshot
}

// BackupResourceList returns the list of backed up resources grouped by the API
Expand Down
38 changes: 0 additions & 38 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -50,7 +49,6 @@ import (
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -553,36 +551,6 @@ func (s *server) initRestic() error {
return nil
}

func (s *server) getCSISnapshotListers() (snapshotv1listers.VolumeSnapshotLister, snapshotv1listers.VolumeSnapshotContentLister, snapshotv1listers.VolumeSnapshotClassLister) {
// Make empty listers that will only be populated if CSI is properly enabled.
var vsLister snapshotv1listers.VolumeSnapshotLister
var vscLister snapshotv1listers.VolumeSnapshotContentLister
var vsClassLister snapshotv1listers.VolumeSnapshotClassLister
var err error

// If CSI is enabled, check for the CSI groups and generate the listers
// If CSI isn't enabled, return empty listers.
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Fatalf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String())
case err == nil:
// CSI is enabled, and the resources were found.
// Instantiate the listers fully
s.logger.Debug("Creating CSI listers")
// Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe.
vsLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshots().Lister()
vscLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshotContents().Lister()
vsClassLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshotClasses().Lister()
case err != nil:
cmd.CheckError(err)
}
}
return vsLister, vscLister, vsClassLister
}

func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error {
s.logger.Info("Starting controllers")

Expand All @@ -607,8 +575,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string

backupStoreGetter := persistence.NewObjectBackupStoreGetter(s.credentialFileStore)

csiVSLister, csiVSCLister, csiVSClassLister := s.getCSISnapshotListers()

backupTracker := controller.NewBackupTracker()

backupControllerRunInfo := func() controllerRunInfo {
Expand Down Expand Up @@ -645,10 +611,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
csiVSLister,
s.csiSnapshotClient,
csiVSCLister,
csiVSClassLister,
backupStoreGetter,
s.credentialFileStore,
)
Expand Down
183 changes: 88 additions & 95 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/csi"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -76,29 +74,25 @@ import (

type backupController struct {
*genericController
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
kbClient kbclient.Client
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
defaultBackupLocation string
defaultVolumesToFsBackup bool
defaultBackupTTL time.Duration
defaultCSISnapshotTimeout time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
formatFlag logging.Format
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
volumeSnapshotClient *snapshotterClientSet.Clientset
volumeSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister
volumeSnapshotClassLister snapshotv1listers.VolumeSnapshotClassLister
credentialFileStore credentials.FileStore
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
kbClient kbclient.Client
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
defaultBackupLocation string
defaultVolumesToFsBackup bool
defaultBackupTTL time.Duration
defaultCSISnapshotTimeout time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
formatFlag logging.Format
credentialFileStore credentials.FileStore
}

func NewBackupController(
Expand All @@ -119,38 +113,30 @@ func NewBackupController(
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient *snapshotterClientSet.Clientset,
volumeSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister,
volumesnapshotClassLister snapshotv1listers.VolumeSnapshotClassLister,
backupStoreGetter persistence.ObjectBackupStoreGetter,
credentialStore credentials.FileStore,
) Interface {
c := &backupController{
genericController: newGenericController(Backup, logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
kbClient: kbClient,
defaultBackupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: defaultVolumesToFsBackup,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
volumeSnapshotContentLister: volumeSnapshotContentLister,
volumeSnapshotClassLister: volumesnapshotClassLister,
backupStoreGetter: backupStoreGetter,
credentialFileStore: credentialStore,
genericController: newGenericController(Backup, logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
kbClient: kbClient,
defaultBackupLocation: defaultBackupLocation,
defaultVolumesToFsBackup: defaultVolumesToFsBackup,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
backupStoreGetter: backupStoreGetter,
credentialFileStore: credentialStore,
}

c.syncHandler = c.processBackup
Expand Down Expand Up @@ -662,47 +648,51 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {

// Empty slices here so that they can be passed in to the persistBackup call later, regardless of whether or not CSI's enabled.
// This way, we only make the Lister call if the feature flag's on.
var volumeSnapshots []*snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []*snapshotv1api.VolumeSnapshotClass
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
// Listers are wrapped in a nil check out of caution, since they may not be populated based on the
// EnableCSI feature flag. This is more to guard against programmer error, as they shouldn't be nil
// when EnableCSI is on.
if c.volumeSnapshotLister != nil {
volumeSnapshots, err = c.volumeSnapshotLister.List(selector)
if err != nil {
backupLog.Error(err)
}
vsList := &snapshotv1api.VolumeSnapshotList{}
vscList := &snapshotv1api.VolumeSnapshotContentList{}

err = c.checkVolumeSnapshotReadyToUse(context.Background(), volumeSnapshots, backup.Spec.CSISnapshotTimeout.Duration)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
}

backup.CSISnapshots = volumeSnapshots
err = c.kbClient.List(context.Background(), vsList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vsList.Items) >= 0 {
volumeSnapshots = vsList.Items
}
err = c.checkVolumeSnapshotReadyToUse(context.Background(), volumeSnapshots, backup.Spec.CSISnapshotTimeout.Duration)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
}
backup.CSISnapshots = volumeSnapshots

if c.volumeSnapshotContentLister != nil {
volumeSnapshotContents, err = c.volumeSnapshotContentLister.List(selector)
if err != nil {
backupLog.Error(err)
}
err = c.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}

vsClassSet := sets.NewString()
for _, vsc := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if c.volumeSnapshotClassLister != nil &&
vsc.Spec.VolumeSnapshotClassName != nil &&
!vsClassSet.Has(*vsc.Spec.VolumeSnapshotClassName) {
if vsClass, err := c.volumeSnapshotClassLister.Get(*vsc.Spec.VolumeSnapshotClassName); err != nil {
if vsc.Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*vsc.Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *vsc.Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*vsc.Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, vsClass)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}

if err := csi.ResetVolumeSnapshotContent(vsc); err != nil {
backupLog.Error(err)
}
Expand Down Expand Up @@ -806,9 +796,9 @@ func persistBackup(backup *pkgbackup.Request,
backupContents, backupLog *os.File,
backupStore persistence.BackupStore,
log logrus.FieldLogger,
csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot,
csiVolumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent,
csiVolumesnapshotClasses []*snapshotv1api.VolumeSnapshotClass,
csiVolumeSnapshots []snapshotv1api.VolumeSnapshot,
csiVolumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
csiVolumesnapshotClasses []snapshotv1api.VolumeSnapshotClass,
) []error {
persistErrs := []error{}
backupJSON := new(bytes.Buffer)
Expand Down Expand Up @@ -917,7 +907,7 @@ func encodeToJSONGzip(data interface{}, desc string) (*bytes.Buffer, []error) {
// using goroutine here instead of waiting in CSI plugin, because it's not easy to make BackupItemAction
// parallel by now. After BackupItemAction parallel is implemented, this logic should be moved to CSI plugin
// as https://github.com/vmware-tanzu/velero-plugin-for-csi/pull/100
func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, volumesnapshots []*snapshotv1api.VolumeSnapshot,
func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, volumesnapshots []snapshotv1api.VolumeSnapshot,
csiSnapshotTimeout time.Duration) error {
eg, _ := errgroup.WithContext(ctx)
timeout := csiSnapshotTimeout
Expand All @@ -927,7 +917,8 @@ func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, vo
volumeSnapshot := vs
eg.Go(func() error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVS, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(ctx, volumeSnapshot.Name, metav1.GetOptions{})
tmpVS := &snapshotv1api.VolumeSnapshot{}
err := c.kbClient.Get(ctx, kbclient.ObjectKey{Name: volumeSnapshot.Name, Namespace: volumeSnapshot.Namespace}, tmpVS)
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name))
}
Expand All @@ -952,28 +943,29 @@ func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, vo
// which will cause snapshot deletion on cloud provider, then backup cannot restore the PV.
// If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to
// change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete.
func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent,
func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
backup pkgbackup.Request, logger logrus.FieldLogger) {
var wg sync.WaitGroup
vscMap := make(map[string]*snapshotv1api.VolumeSnapshotContent)
vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent)
for _, vsc := range volumeSnapshotContents {
vscMap[vsc.Name] = vsc
}

for _, vs := range volumeSnapshots {
wg.Add(1)
go func(vs *snapshotv1api.VolumeSnapshot) {
go func(vs snapshotv1api.VolumeSnapshot) {
defer wg.Done()
var vsc *snapshotv1api.VolumeSnapshotContent
var vsc snapshotv1api.VolumeSnapshotContent
modifyVSCFlag := false
if vs.Status.BoundVolumeSnapshotContentName != nil &&
len(*vs.Status.BoundVolumeSnapshotContentName) > 0 {
vsc = vscMap[*vs.Status.BoundVolumeSnapshotContentName]
if nil == vsc {
var found bool
if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found {
logger.Errorf("Not find %s from the vscMap", vs.Status.BoundVolumeSnapshotContentName)
return
}

if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete {
modifyVSCFlag = true
}
Expand All @@ -987,7 +979,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
original := vsc.DeepCopy()
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
if err := c.kbClient.Patch(context.Background(), vsc, kbclient.MergeFrom(original)); err != nil {
if err := c.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
return
}
Expand All @@ -1003,7 +995,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api

// Delete VolumeSnapshot from cluster
logger.Debugf("Deleting VolumeSnapshotContent %s", vsc.Name)
err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
err := c.kbClient.Delete(context.TODO(), &vs)
if err != nil {
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
}
Expand All @@ -1018,18 +1010,19 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api
// and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS.
// Set VolumeSnapshotRef's UID to nil will let the csi-controller finds out the related VS is gone, then
// VSC can be deleted.
func (c *backupController) recreateVolumeSnapshotContent(vsc *snapshotv1api.VolumeSnapshotContent) error {
func (c *backupController) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error {
timeout := 1 * time.Minute
interval := 1 * time.Second

err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Delete(context.TODO(), vsc.Name, metav1.DeleteOptions{})
err := c.kbClient.Delete(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to delete VolumeSnapshotContent: %s", vsc.Name)
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
_, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Get(context.TODO(), vsc.Name, metav1.GetOptions{})
tmpVSC := &snapshotv1api.VolumeSnapshotContent{}
err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC)
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
Expand Down Expand Up @@ -1058,7 +1051,7 @@ func (c *backupController) recreateVolumeSnapshotContent(vsc *snapshotv1api.Volu
}
// ResourceVersion shouldn't exist for new creation.
vsc.ResourceVersion = ""
_, err = c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), vsc, metav1.CreateOptions{})
err = c.kbClient.Create(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to create VolumeSnapshotContent %s", vsc.Name)
}
Expand Down
Loading