Skip to content

Commit

Permalink
delete old deletion requests for backup when processing a new one
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <[email protected]>
  • Loading branch information
skriss committed Jul 20, 2018
1 parent 85a61b8 commit 78cbdf9
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 77 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ func (s *server) runControllers(config *api.Config) error {
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
s.arkClient.ArkV1(),
config.GCSyncPeriod.Duration,
)
Expand Down
34 changes: 32 additions & 2 deletions pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -103,8 +104,7 @@ func NewBackupDeletionController(

deleteBackupRequestInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
AddFunc: c.enqueue,
},
)

Expand Down Expand Up @@ -162,6 +162,12 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return err
}

// Remove any existing deletion requests for this backup so we only have
// one at a time
if errs := c.deleteExistingDeletionRequests(req, log); errs != nil {
return kubeerrs.NewAggregate(errs)
}

// Don't allow deleting an in-progress backup
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
Expand Down Expand Up @@ -303,6 +309,30 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return nil
}

func (c *backupDeletionController) deleteExistingDeletionRequests(req *v1.DeleteBackupRequest, log logrus.FieldLogger) []error {
log.Info("Removing existing deletion requests for backup")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
v1.BackupNameLabel: req.Spec.BackupName,
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector)
if err != nil {
return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")}
}

var errs []error
for _, dbr := range dbrs {
if dbr.Name == req.Name {
continue
}

if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(dbr.Name, nil); err != nil {
errs = append(errs, errors.WithStack(err))
}
}

return errs
}

func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error {
if c.resticMgr == nil {
return nil
Expand Down
123 changes: 56 additions & 67 deletions pkg/controller/backup_deletion_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -26,7 +25,6 @@ import (
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/util/kube"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -36,74 +34,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
)

func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) {
req := pkgbackup.NewDeleteBackupRequest("foo", "uid")
req.Namespace = "heptio-ark"
expected := kube.NamespaceAndName(req)

client := fake.NewSimpleClientset(req)

fakeWatch := watch.NewFake()
defer fakeWatch.Stop()
client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil))

sharedInformers := informers.NewSharedInformerFactory(client, 0)

controller := NewBackupDeletionController(
arktest.NewLogger(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(), // deleteBackupRequestClient
client.ArkV1(), // backupClient
nil, // snapshotService
nil, // backupService
"bucket",
sharedInformers.Ark().V1().Restores(),
client.ArkV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Ark().V1().PodVolumeBackups(),
).(*backupDeletionController)

// disable resync handler since we don't want to test it here
controller.resyncFunc = nil

keys := make(chan string)

controller.syncHandler = func(key string) error {
keys <- key
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

go sharedInformers.Start(ctx.Done())
go controller.Run(ctx, 1)

// wait for the AddFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for AddFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}

req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
fakeWatch.Add(req)

// wait for the UpdateFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for UpdateFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
}

func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
Expand Down Expand Up @@ -236,6 +169,62 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
assert.Equal(t, expectedActions, td.client.Actions())
})

t.Run("existing deletion requests for the backup are deleted", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)

// add the backup to the tracker so the execution of processRequest doesn't progress
// past checking for an in-progress backup. this makes validation easier.
td.controller.backupTracker.Add(td.req.Namespace, td.req.Spec.BackupName)

require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(td.req))

existing := &v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar",
Labels: map[string]string{
v1.BackupNameLabel: td.req.Spec.BackupName,
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: td.req.Spec.BackupName,
},
}
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(existing))
_, err := td.client.ArkV1().DeleteBackupRequests(td.req.Namespace).Create(existing)
require.NoError(t, err)

require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(
&v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar-2",
Labels: map[string]string{
v1.BackupNameLabel: "some-other-backup",
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: "some-other-backup",
},
},
))

assert.NoError(t, td.controller.processRequest(td.req))

expectedDeleteAction := core.NewDeleteAction(
v1.SchemeGroupVersion.WithResource("deletebackuprequests"),
td.req.Namespace,
"bar",
)

// first action is the Create of an existing DBR for the backup as part of test data setup
// second action is the Delete of the existing DBR, which we're validating
// third action is the Patch of the DBR to set it to processed with an error
require.Len(t, td.client.Actions(), 3)
assert.Equal(t, expectedDeleteAction, td.client.Actions()[1])
})

t.Run("deleting an in progress backup isn't allowed", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)
Expand Down
35 changes: 31 additions & 4 deletions pkg/controller/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"

arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
Expand All @@ -39,6 +40,7 @@ type gcController struct {

logger logrus.FieldLogger
backupLister listers.BackupLister
deleteBackupRequestLister listers.DeleteBackupRequestLister
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
syncPeriod time.Duration

Expand All @@ -49,6 +51,7 @@ type gcController struct {
func NewGCController(
logger logrus.FieldLogger,
backupInformer informers.BackupInformer,
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
syncPeriod time.Duration,
) Interface {
Expand All @@ -62,12 +65,16 @@ func NewGCController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
backupLister: backupInformer.Lister(),
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
deleteBackupRequestClient: deleteBackupRequestClient,
logger: logger,
}

c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced)
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
deleteBackupRequestInformer.Informer().HasSynced,
)

c.resyncPeriod = syncPeriod
c.resyncFunc = c.enqueueAllBackups
Expand Down Expand Up @@ -130,12 +137,32 @@ func (c *gcController) processQueueItem(key string) error {
return nil
}

log.Info("Backup has expired. Creating a DeleteBackupRequest.")
log.Info("Backup has expired")

req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID))
selector := labels.SelectorFromSet(labels.Set(map[string]string{
arkv1api.BackupNameLabel: backup.Name,
arkv1api.BackupUIDLabel: string(backup.UID),
}))

_, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req)
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector)
if err != nil {
return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")
}

// if there's an existing unprocessed deletion request for this backup, don't create
// another one
for _, dbr := range dbrs {
switch dbr.Status.Phase {
case "", arkv1api.DeleteBackupRequestPhaseNew, arkv1api.DeleteBackupRequestPhaseInProgress:
log.Info("Backup already has a pending deletion request")
return nil
}
}

log.Info("Creating a new deletion request")
req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID))

if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req); err != nil {
return errors.Wrap(err, "error creating DeleteBackupRequest")
}

Expand Down
Loading

0 comments on commit 78cbdf9

Please sign in to comment.