From 0e94fa37f97de8e6df5c7238503cd5d9d7481d66 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Tue, 21 Aug 2018 16:52:49 -0700 Subject: [PATCH] update sync controller for backup locations Signed-off-by: Steve Kriss --- pkg/apis/ark/v1/labels_annotations.go | 4 + pkg/cloudprovider/backup_cache.go | 97 ----- pkg/cloudprovider/backup_cache_test.go | 170 -------- pkg/cloudprovider/backup_service.go | 16 - pkg/cmd/server/server.go | 20 +- pkg/controller/backup_controller.go | 6 + pkg/controller/backup_controller_test.go | 21 +- pkg/controller/backup_sync_controller.go | 186 +++++---- pkg/controller/backup_sync_controller_test.go | 391 +++++++++++------- 9 files changed, 379 insertions(+), 532 deletions(-) delete mode 100644 pkg/cloudprovider/backup_cache.go delete mode 100644 pkg/cloudprovider/backup_cache_test.go diff --git a/pkg/apis/ark/v1/labels_annotations.go b/pkg/apis/ark/v1/labels_annotations.go index 00b016a025..300344c5f4 100644 --- a/pkg/apis/ark/v1/labels_annotations.go +++ b/pkg/apis/ark/v1/labels_annotations.go @@ -36,4 +36,8 @@ const ( // a backup/restore-specific timeout value for pod volume operations (i.e. // restic backups/restores). PodVolumeOperationTimeoutAnnotation = "ark.heptio.com/pod-volume-timeout" + + // StorageLocationLabel is the label key used to identify the storage + // location of a backup. + StorageLocationLabel = "ark.heptio.com/storage-location" ) diff --git a/pkg/cloudprovider/backup_cache.go b/pkg/cloudprovider/backup_cache.go deleted file mode 100644 index 06abc99fbc..0000000000 --- a/pkg/cloudprovider/backup_cache.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2017 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloudprovider - -import ( - "context" - "sync" - "time" - - "github.com/sirupsen/logrus" - - "k8s.io/apimachinery/pkg/util/wait" - - "github.com/heptio/ark/pkg/apis/ark/v1" -) - -// backupCacheBucket holds the backups and error from a ListBackups call. -type backupCacheBucket struct { - backups []*v1.Backup - error error -} - -// backupCache caches ListBackups calls, refreshing them periodically. -type backupCache struct { - delegate BackupLister - lock sync.RWMutex - // This doesn't really need to be a map right now, but if we ever move to supporting multiple - // buckets, this will be ready for it. - buckets map[string]*backupCacheBucket - logger logrus.FieldLogger -} - -var _ BackupLister = &backupCache{} - -// NewBackupCache returns a new backup cache that refreshes from delegate every resyncPeriod. -func NewBackupCache(ctx context.Context, delegate BackupLister, resyncPeriod time.Duration, logger logrus.FieldLogger) BackupLister { - c := &backupCache{ - delegate: delegate, - buckets: make(map[string]*backupCacheBucket), - logger: logger, - } - - // Start the goroutine to refresh all buckets every resyncPeriod. This stops when ctx.Done() is - // available. - go wait.Until(c.refresh, resyncPeriod, ctx.Done()) - - return c -} - -// refresh refreshes all the buckets currently in the cache by doing a live lookup via c.delegate. -func (c *backupCache) refresh() { - c.lock.Lock() - defer c.lock.Unlock() - - c.logger.Debug("refreshing all cached backup lists from object storage") - - for bucketName, bucket := range c.buckets { - c.logger.WithField("bucket", bucketName).Debug("Refreshing bucket") - bucket.backups, bucket.error = c.delegate.ListBackups(bucketName) - } -} - -func (c *backupCache) ListBackups(bucketName string) ([]*v1.Backup, error) { - c.lock.RLock() - bucket, found := c.buckets[bucketName] - c.lock.RUnlock() - - logContext := c.logger.WithField("bucket", bucketName) - - if found { - logContext.Debug("Returning cached backup list") - return bucket.backups, bucket.error - } - - logContext.Debug("Bucket is not in cache - doing a live lookup") - - backups, err := c.delegate.ListBackups(bucketName) - c.lock.Lock() - c.buckets[bucketName] = &backupCacheBucket{backups: backups, error: err} - c.lock.Unlock() - - return backups, err -} diff --git a/pkg/cloudprovider/backup_cache_test.go b/pkg/cloudprovider/backup_cache_test.go deleted file mode 100644 index 4eec9481b5..0000000000 --- a/pkg/cloudprovider/backup_cache_test.go +++ /dev/null @@ -1,170 +0,0 @@ -/* -Copyright 2017 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloudprovider - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/heptio/ark/pkg/apis/ark/v1" - cloudprovidermocks "github.com/heptio/ark/pkg/cloudprovider/mocks" - "github.com/heptio/ark/pkg/util/test" -) - -func TestNewBackupCache(t *testing.T) { - var ( - delegate = &cloudprovidermocks.BackupLister{} - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - logger = test.NewLogger() - ) - defer cancel() - - c := NewBackupCache(ctx, delegate, 100*time.Millisecond, logger) - - // nothing in cache, live lookup - bucket1 := []*v1.Backup{ - test.NewTestBackup().WithName("backup1").Backup, - test.NewTestBackup().WithName("backup2").Backup, - } - delegate.On("ListBackups", "bucket1").Return(bucket1, nil).Once() - - // should be updated via refresh - updatedBucket1 := []*v1.Backup{ - test.NewTestBackup().WithName("backup2").Backup, - } - delegate.On("ListBackups", "bucket1").Return(updatedBucket1, nil) - - // nothing in cache, live lookup - bucket2 := []*v1.Backup{ - test.NewTestBackup().WithName("backup5").Backup, - test.NewTestBackup().WithName("backup6").Backup, - } - delegate.On("ListBackups", "bucket2").Return(bucket2, nil).Once() - - // should be updated via refresh - updatedBucket2 := []*v1.Backup{ - test.NewTestBackup().WithName("backup7").Backup, - } - delegate.On("ListBackups", "bucket2").Return(updatedBucket2, nil) - - backups, err := c.ListBackups("bucket1") - assert.Equal(t, bucket1, backups) - assert.NoError(t, err) - - backups, err = c.ListBackups("bucket2") - assert.Equal(t, bucket2, backups) - assert.NoError(t, err) - - var done1, done2 bool - for { - select { - case <-ctx.Done(): - t.Fatal("timed out") - default: - if done1 && done2 { - return - } - } - - backups, err = c.ListBackups("bucket1") - if len(backups) == 1 { - if assert.Equal(t, updatedBucket1[0], backups[0]) { - done1 = true - } - } - - backups, err = c.ListBackups("bucket2") - if len(backups) == 1 { - if assert.Equal(t, updatedBucket2[0], backups[0]) { - done2 = true - } - } - time.Sleep(100 * time.Millisecond) - } -} - -func TestBackupCacheRefresh(t *testing.T) { - var ( - delegate = &cloudprovidermocks.BackupLister{} - logger = test.NewLogger() - ) - - c := &backupCache{ - delegate: delegate, - buckets: map[string]*backupCacheBucket{ - "bucket1": {}, - "bucket2": {}, - }, - logger: logger, - } - - bucket1 := []*v1.Backup{ - test.NewTestBackup().WithName("backup1").Backup, - test.NewTestBackup().WithName("backup2").Backup, - } - delegate.On("ListBackups", "bucket1").Return(bucket1, nil) - - delegate.On("ListBackups", "bucket2").Return(nil, errors.New("bad")) - - c.refresh() - - assert.Equal(t, bucket1, c.buckets["bucket1"].backups) - assert.NoError(t, c.buckets["bucket1"].error) - - assert.Empty(t, c.buckets["bucket2"].backups) - assert.EqualError(t, c.buckets["bucket2"].error, "bad") -} - -func TestBackupCacheGetAllBackupsUsesCacheIfPresent(t *testing.T) { - var ( - delegate = &cloudprovidermocks.BackupLister{} - logger = test.NewLogger() - bucket1 = []*v1.Backup{ - test.NewTestBackup().WithName("backup1").Backup, - test.NewTestBackup().WithName("backup2").Backup, - } - ) - - c := &backupCache{ - delegate: delegate, - buckets: map[string]*backupCacheBucket{ - "bucket1": { - backups: bucket1, - }, - }, - logger: logger, - } - - bucket2 := []*v1.Backup{ - test.NewTestBackup().WithName("backup3").Backup, - test.NewTestBackup().WithName("backup4").Backup, - } - - delegate.On("ListBackups", "bucket2").Return(bucket2, nil) - - backups, err := c.ListBackups("bucket1") - assert.Equal(t, bucket1, backups) - assert.NoError(t, err) - - backups, err = c.ListBackups("bucket2") - assert.Equal(t, bucket2, backups) - assert.NoError(t, err) -} diff --git a/pkg/cloudprovider/backup_service.go b/pkg/cloudprovider/backup_service.go index 49aaebefc8..e17859af61 100644 --- a/pkg/cloudprovider/backup_service.go +++ b/pkg/cloudprovider/backup_service.go @@ -147,22 +147,6 @@ func DownloadBackup(objectStore ObjectStore, bucket, backupName string) (io.Read return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName)) } -type liveBackupLister struct { - logger logrus.FieldLogger - objectStore ObjectStore -} - -func NewLiveBackupLister(logger logrus.FieldLogger, objectStore ObjectStore) BackupLister { - return &liveBackupLister{ - logger: logger, - objectStore: objectStore, - } -} - -func (l *liveBackupLister) ListBackups(bucket string) ([]*api.Backup, error) { - return ListBackups(l.logger, l.objectStore, bucket) -} - func ListBackups(logger logrus.FieldLogger, objectStore ObjectStore, bucket string) ([]*api.Backup, error) { prefixes, err := objectStore.ListCommonPrefixes(bucket, "/") if err != nil { diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 32c20b6eb5..ec8f4d3cbd 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -589,12 +589,6 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B ctx := s.ctx var wg sync.WaitGroup - cloudBackupCacheResyncPeriod := durationMin(controller.GCSyncPeriod, s.config.backupSyncPeriod) - s.logger.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod) - - liveBackupLister := cloudprovider.NewLiveBackupLister(s.logger, s.objectStore) - cachedBackupLister := cloudprovider.NewBackupCache(ctx, liveBackupLister, cloudBackupCacheResyncPeriod, s.logger) - go func() { metricsMux := http.NewServeMux() metricsMux.Handle("/metrics", promhttp.Handler()) @@ -608,12 +602,13 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B backupSyncController := controller.NewBackupSyncController( s.arkClient.ArkV1(), - cachedBackupLister, - config.BackupStorageProvider.Bucket, + s.sharedInformerFactory.Ark().V1().Backups(), + s.sharedInformerFactory.Ark().V1().BackupStorageLocations(), s.config.backupSyncPeriod, s.namespace, - s.sharedInformerFactory.Ark().V1().Backups(), + s.pluginRegistry, s.logger, + s.logLevel, ) wg.Add(1) go func() { @@ -775,7 +770,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B // SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS go s.sharedInformerFactory.Start(ctx.Done()) - // Remove this sometime after v0.8.0 + // TODO(1.0): remove cache.WaitForCacheSync(ctx.Done(), s.sharedInformerFactory.Ark().V1().Backups().Informer().HasSynced) s.removeDeprecatedGCFinalizer() @@ -789,9 +784,10 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B return nil } -const gcFinalizer = "gc.ark.heptio.com" - +// TODO(1.0): remove func (s *server) removeDeprecatedGCFinalizer() { + const gcFinalizer = "gc.ark.heptio.com" + backups, err := s.sharedInformerFactory.Ark().V1().Backups().Lister().List(labels.Everything()) if err != nil { s.logger.WithError(errors.WithStack(err)).Error("error listing backups from cache - unable to remove old finalizers") diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 88f62fbe7f..364331734e 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -349,6 +349,12 @@ func (controller *backupController) getLocationAndValidate(itm *api.Backup, defa itm.Spec.StorageLocation = defaultBackupLocation } + // add the storage location as a label for easy filtering later. + if itm.Labels == nil { + itm.Labels = make(map[string]string) + } + itm.Labels[api.StorageLocationLabel] = itm.Spec.StorageLocation + var backupLocation *api.BackupStorageLocation backupLocation, err := controller.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation) if err != nil { diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 49d9c6876d..9a41d6d290 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -25,13 +25,12 @@ import ( "testing" "time" - "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" core "k8s.io/client-go/testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -364,10 +363,14 @@ func TestProcessBackup(t *testing.T) { type SpecPatch struct { StorageLocation string `json:"storageLocation"` } + type ObjectMetaPatch struct { + Labels map[string]string `json:"labels"` + } type Patch struct { - Status StatusPatch `json:"status"` - Spec SpecPatch `json:"spec,omitempty"` + Status StatusPatch `json:"status"` + Spec SpecPatch `json:"spec,omitempty"` + ObjectMeta ObjectMetaPatch `json:"metadata,omitempty"` } decode := func(decoder *json.Decoder) (interface{}, error) { @@ -389,6 +392,11 @@ func TestProcessBackup(t *testing.T) { Spec: SpecPatch{ StorageLocation: "default", }, + ObjectMeta: ObjectMetaPatch{ + Labels: map[string]string{ + v1.StorageLocationLabel: "default", + }, + }, } } else { expected = Patch{ @@ -397,6 +405,11 @@ func TestProcessBackup(t *testing.T) { Phase: v1.BackupPhaseInProgress, Expiration: expiration, }, + ObjectMeta: ObjectMetaPatch{ + Labels: map[string]string{ + v1.StorageLocationLabel: test.backup.Spec.StorageLocation, + }, + }, } } diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index 5b3a482fb1..0706108fce 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controller import ( - "context" "time" "github.com/pkg/errors" @@ -27,133 +26,172 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - api "github.com/heptio/ark/pkg/apis/ark/v1" + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/util/kube" "github.com/heptio/ark/pkg/util/stringslice" ) type backupSyncController struct { - client arkv1client.BackupsGetter - cloudBackupLister cloudprovider.BackupLister - bucket string - syncPeriod time.Duration - namespace string - backupLister listers.BackupLister - backupInformerSynced cache.InformerSynced - logger logrus.FieldLogger + *genericController + + client arkv1client.BackupsGetter + backupLister listers.BackupLister + backupStorageLocationLister listers.BackupStorageLocationLister + namespace string + newPluginManager func(logrus.FieldLogger) plugin.Manager + listCloudBackups func(logrus.FieldLogger, cloudprovider.ObjectStore, string) ([]*arkv1api.Backup, error) } func NewBackupSyncController( client arkv1client.BackupsGetter, - cloudBackupLister cloudprovider.BackupLister, - bucket string, + backupInformer informers.BackupInformer, + backupStorageLocationInformer informers.BackupStorageLocationInformer, syncPeriod time.Duration, namespace string, - backupInformer informers.BackupInformer, + pluginRegistry plugin.Registry, logger logrus.FieldLogger, + logLevel logrus.Level, ) Interface { if syncPeriod < time.Minute { logger.Infof("Provided backup sync period %v is too short. Setting to 1 minute", syncPeriod) syncPeriod = time.Minute } - return &backupSyncController{ - client: client, - cloudBackupLister: cloudBackupLister, - bucket: bucket, - syncPeriod: syncPeriod, - namespace: namespace, - backupLister: backupInformer.Lister(), - backupInformerSynced: backupInformer.Informer().HasSynced, - logger: logger, + + c := &backupSyncController{ + genericController: newGenericController("backup-sync", logger), + client: client, + namespace: namespace, + backupLister: backupInformer.Lister(), + backupStorageLocationLister: backupStorageLocationInformer.Lister(), + + newPluginManager: func(logger logrus.FieldLogger) plugin.Manager { + return plugin.NewManager(logger, logLevel, pluginRegistry) + }, + listCloudBackups: cloudprovider.ListBackups, } -} -// Run is a blocking function that continually runs the object storage -> Ark API -// sync process according to the controller's syncPeriod. It will return when it -// receives on the ctx.Done() channel. -func (c *backupSyncController) Run(ctx context.Context, workers int) error { - c.logger.Info("Running backup sync controller") - c.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.backupInformerSynced) { - return errors.New("timed out waiting for caches to sync") + c.resyncFunc = c.run + c.resyncPeriod = syncPeriod + c.cacheSyncWaiters = []cache.InformerSynced{ + backupInformer.Informer().HasSynced, + backupStorageLocationInformer.Informer().HasSynced, } - c.logger.Info("Caches are synced") - wait.Until(c.run, c.syncPeriod, ctx.Done()) - return nil + + return c } const gcFinalizer = "gc.ark.heptio.com" func (c *backupSyncController) run() { - c.logger.Info("Syncing backups from object storage") - backups, err := c.cloudBackupLister.ListBackups(c.bucket) + c.logger.Info("Syncing backups from backup storage into cluster") + + locations, err := c.backupStorageLocationLister.BackupStorageLocations(c.namespace).List(labels.Everything()) if err != nil { - c.logger.WithError(err).Error("error listing backups") + c.logger.WithError(errors.WithStack(err)).Error("Error getting backup storage locations from lister") return } - c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage") - cloudBackupNames := sets.NewString() - for _, cloudBackup := range backups { - logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup)) - logContext.Info("Syncing backup") + pluginManager := c.newPluginManager(c.logger) - cloudBackupNames.Insert(cloudBackup.Name) + for _, location := range locations { + log := c.logger.WithField("backupLocation", location.Name) + log.Info("Syncing backups from backup location") - // If we're syncing backups made by pre-0.8.0 versions, the server removes all finalizers - // faster than the sync finishes. Just process them as we find them. - cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer) - - cloudBackup.Namespace = c.namespace - cloudBackup.ResourceVersion = "" + objectStore, err := getObjectStoreForLocation(location, pluginManager) + if err != nil { + log.WithError(err).Error("Error getting object store for location") + continue + } - // Backup only if backup does not exist in Kubernetes or if we are not able to get the backup for any reason. - _, err := c.client.Backups(cloudBackup.Namespace).Get(cloudBackup.Name, metav1.GetOptions{}) + backupsInBackupStore, err := c.listCloudBackups(log, objectStore, location.Spec.ObjectStorage.Bucket) if err != nil { + log.WithError(err).Error("Error listing backups in object store") + continue + } + + log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from object store") + + cloudBackupNames := sets.NewString() + for _, cloudBackup := range backupsInBackupStore { + log = log.WithField("backup", kube.NamespaceAndName(cloudBackup)) + log.Debug("Checking cloud backup to see if it needs to be synced into the cluster") + + cloudBackupNames.Insert(cloudBackup.Name) + + // use the controller's namespace when getting the backup because that's where we + // are syncing backups to, regardless of the namespace of the cloud backup. + _, err := c.client.Backups(c.namespace).Get(cloudBackup.Name, metav1.GetOptions{}) + if err == nil { + log.Debug("Backup already exists in cluster") + continue + } if !kuberrs.IsNotFound(err) { - logContext.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with backup sync") + log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster") } - if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !kuberrs.IsAlreadyExists(err) { - logContext.WithError(errors.WithStack(err)).Error("Error syncing backup from object storage") + // remove the pre-v0.8.0 gcFinalizer if it exists + // TODO(1.0): remove this + cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer) + cloudBackup.Namespace = c.namespace + cloudBackup.ResourceVersion = "" + + // update the StorageLocation field and label since the name of the location + // may be different in this cluster than in the cluster that created the + // backup. + cloudBackup.Spec.StorageLocation = location.Name + if cloudBackup.Labels == nil { + cloudBackup.Labels = make(map[string]string) + } + cloudBackup.Labels[arkv1api.StorageLocationLabel] = cloudBackup.Spec.StorageLocation + + _, err = c.client.Backups(cloudBackup.Namespace).Create(cloudBackup) + switch { + case err != nil && kuberrs.IsAlreadyExists(err): + log.Debug("Backup already exists in cluster") + case err != nil && !kuberrs.IsAlreadyExists(err): + log.WithError(errors.WithStack(err)).Error("Error syncing backup into cluster") + default: + log.Debug("Synced backup into cluster") } } - } - c.deleteUnused(cloudBackupNames) - return + c.deleteOrphanedBackups(location.Name, cloudBackupNames, log) + } } -// deleteUnused deletes backup objects from Kubernetes if they are complete -// and there is no corresponding backup in the object storage. -func (c *backupSyncController) deleteUnused(cloudBackupNames sets.String) { - // Backups objects in Kubernetes - backups, err := c.backupLister.Backups(c.namespace).List(labels.Everything()) +// deleteOrphanedBackups deletes backup objects from Kubernetes that have the specified location +// and a phase of Completed, but no corresponding backup in object storage. +func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudBackupNames sets.String, log logrus.FieldLogger) { + locationSelector := labels.Set(map[string]string{ + arkv1api.StorageLocationLabel: locationName, + }).AsSelector() + + backups, err := c.backupLister.Backups(c.namespace).List(locationSelector) if err != nil { - c.logger.WithError(errors.WithStack(err)).Error("Error listing backup from Kubernetes") + log.WithError(errors.WithStack(err)).Error("Error listing backups from cluster") + return } if len(backups) == 0 { return } - // For each completed backup object in Kubernetes, delete it if it - // does not have a corresponding backup in object storage for _, backup := range backups { - if backup.Status.Phase == api.BackupPhaseCompleted && !cloudBackupNames.Has(backup.Name) { - if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil { - c.logger.WithError(errors.WithStack(err)).Error("Error deleting unused backup from Kubernetes") - } else { - c.logger.Debugf("Deleted backup: %s", backup.Name) - } + log = log.WithField("backup", backup.Name) + if backup.Status.Phase != arkv1api.BackupPhaseCompleted || cloudBackupNames.Has(backup.Name) { + continue } - } - return + if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil { + log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster") + } else { + log.Debug("Deleted orphaned backup from cluster") + } + } } diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index 71df0cf8fb..a91a45c0bf 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -20,281 +20,354 @@ import ( "testing" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" core "k8s.io/client-go/testing" - "github.com/heptio/ark/pkg/apis/ark/v1" - cloudprovidermocks "github.com/heptio/ark/pkg/cloudprovider/mocks" + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/plugin" + pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" "github.com/heptio/ark/pkg/util/stringslice" arktest "github.com/heptio/ark/pkg/util/test" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) +func defaultLocationsList(namespace string) []*arkv1api.BackupStorageLocation { + return []*arkv1api.BackupStorageLocation{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "location-1", + }, + Spec: arkv1api.BackupStorageLocationSpec{ + Provider: "objStoreProvider", + StorageType: arkv1api.StorageType{ + ObjectStorage: &arkv1api.ObjectStorageLocation{ + Bucket: "bucket-1", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "location-2", + }, + Spec: arkv1api.BackupStorageLocationSpec{ + Provider: "objStoreProvider", + StorageType: arkv1api.StorageType{ + ObjectStorage: &arkv1api.ObjectStorageLocation{ + Bucket: "bucket-2", + }, + }, + }, + }, + } +} + func TestBackupSyncControllerRun(t *testing.T) { tests := []struct { - name string - listBackupsError error - cloudBackups []*v1.Backup - namespace string - existingBackups sets.String + name string + namespace string + locations []*arkv1api.BackupStorageLocation + cloudBackups map[string][]*arkv1api.Backup + existingBackups []*arkv1api.Backup }{ { name: "no cloud backups", }, { - name: "backup lister returns error on ListBackups", - listBackupsError: errors.New("listBackups"), - }, - { - name: "normal case", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "normal case", namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBackups: map[string][]*arkv1api.Backup{ + "bucket-1": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + }, + "bucket-2": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + }, + }, }, { - name: "Finalizer gets removed on sync", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers(gcFinalizer).Backup, - }, + name: "gcFinalizer (only) gets removed on sync", namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBackups: map[string][]*arkv1api.Backup{ + "bucket-1": { + arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers("a-finalizer", gcFinalizer, "some-other-finalizer").Backup, + }, + }, }, { - name: "Only target finalizer is removed", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithFinalizers(gcFinalizer, "blah").Backup, + name: "all synced backups get created in Ark server's namespace", + namespace: "heptio-ark", + locations: defaultLocationsList("heptio-ark"), + cloudBackups: map[string][]*arkv1api.Backup{ + "bucket-1": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + }, + "bucket-2": { + arktest.NewTestBackup().WithNamespace("ns-2").WithName("backup-3").Backup, + arktest.NewTestBackup().WithNamespace("heptio-ark").WithName("backup-4").Backup, + }, }, - namespace: "ns-1", }, { - name: "backups get created in Ark server's namespace", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-2").WithName("backup-2").Backup, + name: "new backups get synced when some cloud backups already exist in the cluster", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBackups: map[string][]*arkv1api.Backup{ + "bucket-1": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + }, + "bucket-2": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-4").Backup, + }, + }, + existingBackups: []*arkv1api.Backup{ + // add a label to each existing backup so we can differentiate it from the cloud + // backup during verification + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel("i-exist", "true").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel("i-exist", "true").Backup, }, - namespace: "heptio-ark", }, { - name: "normal case with backups that already exist in Kubernetes", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, + name: "backup storage location names and labels get updated", + namespace: "ns-1", + locations: defaultLocationsList("ns-1"), + cloudBackups: map[string][]*arkv1api.Backup{ + "bucket-1": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithStorageLocation("foo").WithLabel(arkv1api.StorageLocationLabel, "foo").Backup, + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, + }, + "bucket-2": { + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithStorageLocation("bar").WithLabel(arkv1api.StorageLocationLabel, "bar").Backup, + }, }, - existingBackups: sets.NewString("backup-2", "backup-3"), - namespace: "ns-1", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - backupLister = &cloudprovidermocks.BackupLister{} client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = arktest.NewLogger() + pluginManager = &pluginmocks.Manager{} + objectStore = &arktest.ObjectStore{} ) c := NewBackupSyncController( client.ArkV1(), - backupLister, - "bucket", + sharedInformers.Ark().V1().Backups(), + sharedInformers.Ark().V1().BackupStorageLocations(), time.Duration(0), test.namespace, - sharedInformers.Ark().V1().Backups(), - logger, + nil, // pluginRegistry + arktest.NewLogger(), + logrus.DebugLevel, ).(*backupSyncController) - backupLister.On("ListBackups", "bucket").Return(test.cloudBackups, test.listBackupsError) + c.newPluginManager = func(_ logrus.FieldLogger) plugin.Manager { return pluginManager } + pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) + pluginManager.On("CleanupClients").Return(nil) + objectStore.On("Init", mock.Anything).Return(nil) - expectedActions := make([]core.Action, 0) + for _, location := range test.locations { + require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) + } - client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.GetAction) - if test.existingBackups.Has(getAction.GetName()) { - return true, nil, nil + c.listCloudBackups = func(_ logrus.FieldLogger, _ cloudprovider.ObjectStore, bucket string) ([]*arkv1api.Backup, error) { + backups, ok := test.cloudBackups[bucket] + if !ok { + return nil, errors.New("bucket not found") } - // We return nil in place of the found backup object because - // we exclusively check for the error and don't use the object - // returned by the Get / Backups call. - return true, nil, apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("backups").GroupResource(), getAction.GetName()) - }) + + return backups, nil + } + + for _, existingBackup := range test.existingBackups { + require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(existingBackup)) + + _, err := client.ArkV1().Backups(test.namespace).Create(existingBackup) + require.NoError(t, err) + } + client.ClearActions() c.run() - // we only expect creates for items within the target bucket - for _, cloudBackup := range test.cloudBackups { - // Verify that the run function stripped the GC finalizer - assert.False(t, stringslice.Has(cloudBackup.Finalizers, gcFinalizer)) - assert.Equal(t, test.namespace, cloudBackup.Namespace) - - actionGet := core.NewGetAction( - v1.SchemeGroupVersion.WithResource("backups"), - test.namespace, - cloudBackup.Name, - ) - expectedActions = append(expectedActions, actionGet) - - if test.existingBackups.Has(cloudBackup.Name) { - continue + for bucket, backups := range test.cloudBackups { + for _, cloudBackup := range backups { + obj, err := client.ArkV1().Backups(test.namespace).Get(cloudBackup.Name, metav1.GetOptions{}) + require.NoError(t, err) + + // did this cloud backup already exist in the cluster? + var existing *arkv1api.Backup + for _, obj := range test.existingBackups { + if obj.Name == cloudBackup.Name { + existing = obj + break + } + } + + if existing != nil { + // if this cloud backup already exists in the cluster, make sure that what we get from the + // client is the existing backup, not the cloud one. + assert.Equal(t, existing, obj) + } else { + // verify that the GC finalizer is removed + assert.Equal(t, stringslice.Except(cloudBackup.Finalizers, gcFinalizer), obj.Finalizers) + + // verify that the storage location field and label are set properly + for _, location := range test.locations { + if location.Spec.ObjectStorage.Bucket == bucket { + assert.Equal(t, location.Name, obj.Spec.StorageLocation) + assert.Equal(t, location.Name, obj.Labels[arkv1api.StorageLocationLabel]) + break + } + } + } } - actionCreate := core.NewCreateAction( - v1.SchemeGroupVersion.WithResource("backups"), - test.namespace, - cloudBackup, - ) - expectedActions = append(expectedActions, actionCreate) } - - assert.Equal(t, expectedActions, client.Actions()) }) } } -func TestDeleteUnused(t *testing.T) { +func TestDeleteOrphanedBackups(t *testing.T) { tests := []struct { name string - cloudBackups []*v1.Backup + cloudBackups sets.String k8sBackups []*arktest.TestBackup namespace string expectedDeletes sets.String }{ { - name: "no overlapping backups", - namespace: "ns-1", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "no overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), k8sBackups: []*arktest.TestBackup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupB").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), }, expectedDeletes: sets.NewString("backupA", "backupB", "backupC"), }, { - name: "some overlapping backups", - namespace: "ns-1", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "some overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), k8sBackups: []*arktest.TestBackup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupC").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-C").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), }, - expectedDeletes: sets.NewString("backupC"), + expectedDeletes: sets.NewString("backup-C"), }, { - name: "all overlapping backups", - namespace: "ns-1", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "all overlapping backups", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), k8sBackups: []*arktest.TestBackup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), }, expectedDeletes: sets.NewString(), }, { - name: "no overlapping backups but including backups that are not complete", - namespace: "ns-1", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "no overlapping backups but including backups that are not complete", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), k8sBackups: []*arktest.TestBackup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithPhase(v1.BackupPhaseCompleted), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("Deleting").WithPhase(v1.BackupPhaseDeleting), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("Failed").WithPhase(v1.BackupPhaseFailed), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("FailedValidation").WithPhase(v1.BackupPhaseFailedValidation), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("InProgress").WithPhase(v1.BackupPhaseInProgress), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("New").WithPhase(v1.BackupPhaseNew), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backupA").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("Deleting").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseDeleting), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("Failed").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailed), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("FailedValidation").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailedValidation), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("InProgress").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseInProgress), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("New").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseNew), }, expectedDeletes: sets.NewString("backupA"), }, { - name: "all overlapping backups and all backups that are not complete", - namespace: "ns-1", - cloudBackups: []*v1.Backup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup, - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").Backup, - }, + name: "all overlapping backups and all backups that are not complete", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), k8sBackups: []*arktest.TestBackup{ - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithPhase(v1.BackupPhaseFailed), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithPhase(v1.BackupPhaseFailedValidation), - arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithPhase(v1.BackupPhaseInProgress), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailed), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseFailedValidation), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-3").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseInProgress), }, expectedDeletes: sets.NewString(), }, + { + name: "no completed backups in other locations are deleted", + namespace: "ns-1", + cloudBackups: sets.NewString("backup-1", "backup-2", "backup-3"), + k8sBackups: []*arktest.TestBackup{ + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-1").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-2").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-C").WithLabel(arkv1api.StorageLocationLabel, "default").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-4").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-5").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted), + arktest.NewTestBackup().WithNamespace("ns-1").WithName("backup-6").WithLabel(arkv1api.StorageLocationLabel, "alternate").WithPhase(arkv1api.BackupPhaseCompleted), + }, + expectedDeletes: sets.NewString("backup-C"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - backupLister = &cloudprovidermocks.BackupLister{} client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = arktest.NewLogger() ) c := NewBackupSyncController( client.ArkV1(), - backupLister, - "bucket", + sharedInformers.Ark().V1().Backups(), + sharedInformers.Ark().V1().BackupStorageLocations(), time.Duration(0), test.namespace, - sharedInformers.Ark().V1().Backups(), - logger, + nil, // pluginRegistry + arktest.NewLogger(), + logrus.InfoLevel, ).(*backupSyncController) expectedDeleteActions := make([]core.Action, 0) - // setup: insert backups into Kubernetes for _, backup := range test.k8sBackups { + // add test backup to informer + require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup), "Error adding backup to informer") + + // add test backup to client + _, err := client.Ark().Backups(test.namespace).Create(backup.Backup) + require.NoError(t, err, "Error adding backup to clientset") + + // if we expect this backup to be deleted, set up the expected DeleteAction if test.expectedDeletes.Has(backup.Name) { actionDelete := core.NewDeleteAction( - v1.SchemeGroupVersion.WithResource("backups"), + arkv1api.SchemeGroupVersion.WithResource("backups"), test.namespace, backup.Name, ) expectedDeleteActions = append(expectedDeleteActions, actionDelete) } - - // add test backup to informer: - err := sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup.Backup) - assert.NoError(t, err, "Error adding backup to informer") - - // add test backup to kubernetes: - _, err = client.Ark().Backups(test.namespace).Create(backup.Backup) - assert.NoError(t, err, "Error deleting from clientset") - } - - // get names of client backups - testBackupNames := sets.NewString() - for _, cloudBackup := range test.cloudBackups { - testBackupNames.Insert(cloudBackup.Name) } - c.deleteUnused(testBackupNames) + c.deleteOrphanedBackups("default", test.cloudBackups, arktest.NewLogger()) numBackups, err := numBackups(t, client, c.namespace) assert.NoError(t, err)