From 8f5346150c7c94e875b53cf659cc54715f113b45 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Wed, 15 Aug 2018 16:42:38 -0700 Subject: [PATCH] download request controller: use backup location for object store Signed-off-by: Steve Kriss --- pkg/cmd/server/server.go | 5 +- pkg/controller/download_request_controller.go | 181 ++++++------------ .../download_request_controller_test.go | 51 ++++- 3 files changed, 110 insertions(+), 127 deletions(-) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index eed216aa4e..2df81996a7 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -737,8 +737,9 @@ func (s *server) runControllers(config *api.Config) error { s.arkClient.ArkV1(), s.sharedInformerFactory.Ark().V1().DownloadRequests(), s.sharedInformerFactory.Ark().V1().Restores(), - s.objectStore, - config.BackupStorageProvider.Bucket, + s.sharedInformerFactory.Ark().V1().BackupStorageLocations(), + s.sharedInformerFactory.Ark().V1().Backups(), + s.pluginRegistry, s.logger, ) wg.Add(1) diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index a497421ccf..cee333162a 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -17,9 +17,7 @@ limitations under the License. package controller import ( - "context" "encoding/json" - "sync" "time" jsonpatch "github.com/evanphx/json-patch" @@ -31,32 +29,29 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/util/kube" ) type downloadRequestController struct { - downloadRequestClient arkv1client.DownloadRequestsGetter - downloadRequestLister listers.DownloadRequestLister - downloadRequestListerSynced cache.InformerSynced - restoreLister listers.RestoreLister - restoreListerSynced cache.InformerSynced - objectStore cloudprovider.ObjectStore - bucket string - syncHandler func(key string) error - queue workqueue.RateLimitingInterface - clock clock.Clock - logger logrus.FieldLogger - - createSignedURL cloudprovider.CreateSignedURLFunc + *genericController + + downloadRequestClient arkv1client.DownloadRequestsGetter + downloadRequestLister listers.DownloadRequestLister + restoreLister listers.RestoreLister + clock clock.Clock + createSignedURL cloudprovider.CreateSignedURLFunc + backupLocationLister listers.BackupStorageLocationLister + backupLister listers.BackupLister + pluginRegistry plugin.Registry + newPluginManager func(logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry) plugin.Manager } // NewDownloadRequestController creates a new DownloadRequestController. @@ -64,26 +59,35 @@ func NewDownloadRequestController( downloadRequestClient arkv1client.DownloadRequestsGetter, downloadRequestInformer informers.DownloadRequestInformer, restoreInformer informers.RestoreInformer, - objectStore cloudprovider.ObjectStore, - bucket string, + backupLocationInformer informers.BackupStorageLocationInformer, + backupInformer informers.BackupInformer, + pluginRegistry plugin.Registry, logger logrus.FieldLogger, ) Interface { c := &downloadRequestController{ - downloadRequestClient: downloadRequestClient, - downloadRequestLister: downloadRequestInformer.Lister(), - downloadRequestListerSynced: downloadRequestInformer.Informer().HasSynced, - restoreLister: restoreInformer.Lister(), - restoreListerSynced: restoreInformer.Informer().HasSynced, - objectStore: objectStore, - bucket: bucket, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "downloadrequest"), - clock: &clock.RealClock{}, - logger: logger, - - createSignedURL: cloudprovider.CreateSignedURL, + genericController: newGenericController("downloadrequest", logger), + downloadRequestClient: downloadRequestClient, + downloadRequestLister: downloadRequestInformer.Lister(), + restoreLister: restoreInformer.Lister(), + backupLocationLister: backupLocationInformer.Lister(), + backupLister: backupInformer.Lister(), + + // use variables to refer to these functions so they can be + // replaced with fakes for testing. + createSignedURL: cloudprovider.CreateSignedURL, + newPluginManager: plugin.NewManager, + + clock: &clock.RealClock{}, } c.syncHandler = c.processDownloadRequest + c.cacheSyncWaiters = append( + c.cacheSyncWaiters, + downloadRequestInformer.Informer().HasSynced, + restoreInformer.Informer().HasSynced, + backupLocationInformer.Informer().HasSynced, + backupInformer.Informer().HasSynced, + ) downloadRequestInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ @@ -104,102 +108,21 @@ func NewDownloadRequestController( return c } -// Run is a blocking function that runs the specified number of worker goroutines -// to process items in the work queue. It will return when it receives on the -// ctx.Done() channel. -func (c *downloadRequestController) Run(ctx context.Context, numWorkers int) error { - var wg sync.WaitGroup - - defer func() { - c.logger.Info("Waiting for workers to finish their work") - - c.queue.ShutDown() - - // We have to wait here in the deferred function instead of at the bottom of the function body - // because we have to shut down the queue in order for the workers to shut down gracefully, and - // we want to shut down the queue via defer and not at the end of the body. - wg.Wait() - - c.logger.Info("All workers have finished") - }() - - c.logger.Info("Starting DownloadRequestController") - defer c.logger.Info("Shutting down DownloadRequestController") - - c.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.downloadRequestListerSynced, c.restoreListerSynced) { - return errors.New("timed out waiting for caches to sync") - } - c.logger.Info("Caches are synced") - - wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - go func() { - wait.Until(c.runWorker, time.Second, ctx.Done()) - wg.Done() - }() - } - - wg.Add(1) - go func() { - wait.Until(c.resync, time.Minute, ctx.Done()) - wg.Done() - }() - - <-ctx.Done() - - return nil -} - -// runWorker runs a worker until the controller's queue indicates it's time to shut down. -func (c *downloadRequestController) runWorker() { - // continually take items off the queue (waits if it's - // empty) until we get a shutdown signal from the queue - for c.processNextWorkItem() { - } -} - -// processNextWorkItem processes a single item from the queue. -func (c *downloadRequestController) processNextWorkItem() bool { - key, quit := c.queue.Get() - if quit { - return false - } - // always call done on this item, since if it fails we'll add - // it back with rate-limiting below - defer c.queue.Done(key) - - err := c.syncHandler(key.(string)) - if err == nil { - // If you had no error, tell the queue to stop tracking history for your key. This will reset - // things like failure counts for per-item rate limiting. - c.queue.Forget(key) - return true - } - - c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") - - // we had an error processing the item so add it back - // into the queue for re-processing with rate-limiting - c.queue.AddRateLimited(key) - - return true -} - // processDownloadRequest is the default per-item sync handler. It generates a pre-signed URL for // a new DownloadRequest or deletes the DownloadRequest if it has expired. func (c *downloadRequestController) processDownloadRequest(key string) error { - logContext := c.logger.WithField("key", key) + log := c.logger.WithField("key", key) - logContext.Debug("Running processDownloadRequest") + log.Debug("Running processDownloadRequest") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return errors.Wrap(err, "error splitting queue key") + log.WithError(err).Error("error splitting queue key") + return nil } downloadRequest, err := c.downloadRequestLister.DownloadRequests(ns).Get(name) if apierrors.IsNotFound(err) { - logContext.Debug("Unable to find DownloadRequest") + log.Debug("Unable to find DownloadRequest") return nil } if err != nil { @@ -208,7 +131,7 @@ func (c *downloadRequestController) processDownloadRequest(key string) error { switch downloadRequest.Status.Phase { case "", v1.DownloadRequestPhaseNew: - return c.generatePreSignedURL(downloadRequest) + return c.generatePreSignedURL(downloadRequest, log) case v1.DownloadRequestPhaseProcessed: return c.deleteIfExpired(downloadRequest) } @@ -220,7 +143,7 @@ const signedURLTTL = 10 * time.Minute // generatePreSignedURL generates a pre-signed URL for downloadRequest, changes the phase to // Processed, and persists the changes to storage. -func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.DownloadRequest) error { +func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.DownloadRequest, log *logrus.Entry) error { update := downloadRequest.DeepCopy() var ( @@ -240,7 +163,25 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow directory = downloadRequest.Spec.Target.Name } - update.Status.DownloadURL, err = c.createSignedURL(c.objectStore, downloadRequest.Spec.Target, c.bucket, directory, signedURLTTL) + backup, err := c.backupLister.Backups(downloadRequest.Namespace).Get(directory) + if err != nil { + return errors.WithStack(err) + } + + backupLocation, err := c.backupLocationLister.BackupStorageLocations(backup.Namespace).Get(backup.Spec.StorageLocation) + if err != nil { + return errors.WithStack(err) + } + + pluginManager := c.newPluginManager(log, log.Level, c.pluginRegistry) + defer pluginManager.CleanupClients() + + objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) + if err != nil { + return errors.WithStack(err) + } + + update.Status.DownloadURL, err = c.createSignedURL(objectStore, downloadRequest.Spec.Target, backupLocation.Spec.ObjectStorage.Bucket, directory, signedURLTTL) if err != nil { return err } diff --git a/pkg/controller/download_request_controller_test.go b/pkg/controller/download_request_controller_test.go index 25505e36ab..350f6cc195 100644 --- a/pkg/controller/download_request_controller_test.go +++ b/pkg/controller/download_request_controller_test.go @@ -21,7 +21,9 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +33,8 @@ import ( "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/plugin" + pluginmocks "github.com/heptio/ark/pkg/plugin/mocks" arktest "github.com/heptio/ark/pkg/util/test" ) @@ -52,9 +56,8 @@ func TestProcessDownloadRequest(t *testing.T) { key: "", }, { - name: "bad key format", - key: "a/b/c", - expectedError: `error splitting queue key: unexpected key format: "a/b/c"`, + name: "bad key format", + key: "a/b/c", }, { name: "backup log request with phase '' gets a url", @@ -109,17 +112,29 @@ func TestProcessDownloadRequest(t *testing.T) { restoresInformer = sharedInformers.Ark().V1().Restores() logger = arktest.NewLogger() clockTime, _ = time.Parse("Mon Jan 2 15:04:05 2006", "Mon Jan 2 15:04:05 2006") + pluginManager = &pluginmocks.Manager{} + objectStore = &arktest.ObjectStore{} ) c := NewDownloadRequestController( client.ArkV1(), downloadRequestsInformer, restoresInformer, - nil, // objectStore - "bucket", + sharedInformers.Ark().V1().BackupStorageLocations(), + sharedInformers.Ark().V1().Backups(), + nil, // pluginRegistry logger, ).(*downloadRequestController) + c.newPluginManager = func(_ logrus.FieldLogger, _ logrus.Level, _ plugin.Registry) plugin.Manager { + return pluginManager + } + + pluginManager.On("GetObjectStore", "objStoreProvider").Return(objectStore, nil) + pluginManager.On("CleanupClients").Return(nil) + + objectStore.On("Init", mock.Anything).Return(nil) + c.clock = clock.NewFakeClock(clockTime) var downloadRequest *v1.DownloadRequest @@ -145,6 +160,32 @@ func TestProcessDownloadRequest(t *testing.T) { restoresInformer.Informer().GetStore().Add(tc.restore) } + if tc.expectedDir != "" { + backup := &v1.Backup{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.expectedDir, + Namespace: v1.DefaultNamespace, + }, + } + require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup)) + + location := &v1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Name: backup.Spec.StorageLocation, + Namespace: backup.Namespace, + }, + Spec: v1.BackupStorageLocationSpec{ + Provider: "objStoreProvider", + StorageType: v1.StorageType{ + ObjectStorage: &v1.ObjectStorageLocation{ + Bucket: "bucket", + }, + }, + }, + } + require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(location)) + } + c.createSignedURL = func(objectStore cloudprovider.ObjectStore, target v1.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) { require.Equal(t, expectedTarget, target) require.Equal(t, "bucket", bucket)