Skip to content

Commit

Permalink
Merge pull request #608 from skriss/no-pv-snapshot-if-restic-backup
Browse files Browse the repository at this point in the history
don't snapshot volumes that have been backed up with restic
  • Loading branch information
ncdc authored Jun 28, 2018
2 parents 539de6d + 11c176c commit eaeb9d6
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 60 deletions.
1 change: 1 addition & 0 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io
resourceHooks,
kb.snapshotService,
resticBackupper,
newPVCSnapshotTracker(),
)

for _, group := range kb.discoveryHelper.Resources() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ func TestBackup(t *testing.T) {
test.expectedHooks,
mock.Anything,
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
).Return(groupBackupper)

for group, err := range test.backupGroupErrors {
Expand Down Expand Up @@ -606,6 +607,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&mockGroupBackupper{})

assert.NoError(t, b.Backup(&v1.Backup{}, &bytes.Buffer{}, &bytes.Buffer{}, nil))
Expand Down Expand Up @@ -637,6 +639,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&mockGroupBackupper{})

assert.NoError(t, b.Backup(&v1.Backup{}, &bytes.Buffer{}, &bytes.Buffer{}, nil))
Expand Down Expand Up @@ -665,6 +668,7 @@ func (f *mockGroupBackupperFactory) newGroupBackupper(
resourceHooks []resourceHook,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) groupBackupper {
args := f.Called(
log,
Expand All @@ -681,6 +685,7 @@ func (f *mockGroupBackupperFactory) newGroupBackupper(
resourceHooks,
snapshotService,
resticBackupper,
resticSnapshotTracker,
)
return args.Get(0).(groupBackupper)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/backup/group_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type groupBackupperFactory interface {
resourceHooks []resourceHook,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) groupBackupper
}

Expand All @@ -70,6 +71,7 @@ func (f *defaultGroupBackupperFactory) newGroupBackupper(
resourceHooks []resourceHook,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) groupBackupper {
return &defaultGroupBackupper{
log: log,
Expand All @@ -86,6 +88,7 @@ func (f *defaultGroupBackupperFactory) newGroupBackupper(
resourceHooks: resourceHooks,
snapshotService: snapshotService,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
resourceBackupperFactory: &defaultResourceBackupperFactory{},
}
}
Expand All @@ -108,6 +111,7 @@ type defaultGroupBackupper struct {
resourceHooks []resourceHook
snapshotService cloudprovider.SnapshotService
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
resourceBackupperFactory resourceBackupperFactory
}

Expand All @@ -131,6 +135,7 @@ func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) erro
gb.resourceHooks,
gb.snapshotService,
gb.resticBackupper,
gb.resticSnapshotTracker,
)
)

Expand Down
5 changes: 5 additions & 0 deletions pkg/backup/group_backupper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestBackupGroup(t *testing.T) {
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
).(*defaultGroupBackupper)

resourceBackupperFactory := &mockResourceBackupperFactory{}
Expand All @@ -113,6 +114,7 @@ func TestBackupGroup(t *testing.T) {
resourceHooks,
nil,
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
).Return(resourceBackupper)

group := &metav1.APIResourceList{
Expand Down Expand Up @@ -161,6 +163,7 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper(
resourceHooks []resourceHook,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) resourceBackupper {
args := rbf.Called(
log,
Expand All @@ -176,6 +179,8 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper(
tarWriter,
resourceHooks,
snapshotService,
resticBackupper,
resticSnapshotTracker,
)
return args.Get(0).(resourceBackupper)
}
Expand Down
136 changes: 77 additions & 59 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -58,6 +57,7 @@ type itemBackupperFactory interface {
discoveryHelper discovery.Helper,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) ItemBackupper
}

Expand All @@ -75,6 +75,7 @@ func (f *defaultItemBackupperFactory) newItemBackupper(
discoveryHelper discovery.Helper,
snapshotService cloudprovider.SnapshotService,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
) ItemBackupper {
ib := &defaultItemBackupper{
backup: backup,
Expand All @@ -90,7 +91,8 @@ func (f *defaultItemBackupperFactory) newItemBackupper(
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: podCommandExecutor,
},
resticBackupper: resticBackupper,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
}

// this is for testing purposes
Expand All @@ -104,17 +106,18 @@ type ItemBackupper interface {
}

type defaultItemBackupper struct {
backup *api.Backup
namespaces *collections.IncludesExcludes
resources *collections.IncludesExcludes
backedUpItems map[itemKey]struct{}
actions []resolvedAction
tarWriter tarWriter
resourceHooks []resourceHook
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
snapshotService cloudprovider.SnapshotService
resticBackupper restic.Backupper
backup *api.Backup
namespaces *collections.IncludesExcludes
resources *collections.IncludesExcludes
backedUpItems map[itemKey]struct{}
actions []resolvedAction
tarWriter tarWriter
resourceHooks []resourceHook
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
snapshotService cloudprovider.SnapshotService
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker

itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
Expand Down Expand Up @@ -178,34 +181,53 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
return err
}

backupErrs := make([]error, 0)
err = ib.executeActions(log, obj, groupResource, name, namespace, metadata)
if err != nil {
var (
backupErrs []error
pod *corev1api.Pod
resticVolumesToBackup []string
)

if groupResource == kuberesource.Pods {
// pod needs to be initialized for the unstructured converter
pod = new(corev1api.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod); err != nil {
backupErrs = append(backupErrs, errors.WithStack(err))
// nil it on error since it's not valid
pod = nil
} else {
// get the volumes to backup using restic, and add any of them that are PVCs to the pvc snapshot
// tracker, so that when we backup PVCs/PVs via an item action in the next step, we don't snapshot
// PVs that will have their data backed up with restic.
resticVolumesToBackup = restic.GetVolumesToBackup(pod)

ib.resticSnapshotTracker.Track(pod, resticVolumesToBackup)
}
}

if err := ib.executeActions(log, obj, groupResource, name, namespace, metadata); err != nil {
log.WithError(err).Error("Error executing item actions")
backupErrs = append(backupErrs, err)
}

if groupResource == kuberesource.PersistentVolumes {
if ib.snapshotService == nil {
log.Debug("Skipping Persistent Volume snapshot because they're not enabled.")
} else {
if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil {
backupErrs = append(backupErrs, err)
}
} else if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil {
backupErrs = append(backupErrs, err)
}
}

if groupResource == kuberesource.Pods && len(restic.GetVolumesToBackup(metadata)) > 0 {
var (
updatedObj runtime.Unstructured
errs []error
)
if groupResource == kuberesource.Pods && pod != nil {
// this function will return partial results, so process volumeSnapshots
// even if there are errors.
volumeSnapshots, errs := ib.backupPodVolumes(log, pod, resticVolumesToBackup)

if updatedObj, errs = backupPodVolumes(log, ib.backup, obj, ib.resticBackupper); len(errs) > 0 {
backupErrs = append(backupErrs, errs...)
} else {
obj = updatedObj
// annotate the pod with the successful volume snapshots
for volume, snapshot := range volumeSnapshots {
restic.SetPodSnapshotAnnotation(metadata, volume, snapshot)
}

backupErrs = append(backupErrs, errs...)
}

log.Debug("Executing post hooks")
Expand Down Expand Up @@ -248,37 +270,19 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
return nil
}

func backupPodVolumes(log logrus.FieldLogger, backup *api.Backup, obj runtime.Unstructured, backupper restic.Backupper) (runtime.Unstructured, []error) {
if backupper == nil {
log.Warn("No restic backupper, not backing up pod's volumes")
return obj, nil
}

pod := new(corev1api.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod); err != nil {
return nil, []error{errors.WithStack(err)}
// backupPodVolumes triggers restic backups of the specified pod volumes, and returns a map of volume name -> snapshot ID
// for volumes that were successfully backed up, and a slice of any errors that were encountered.
func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) (map[string]string, []error) {
if len(volumes) == 0 {
return nil, nil
}

volumeSnapshots, errs := backupper.BackupPodVolumes(backup, pod, log)
if len(errs) > 0 {
return nil, errs
}
if len(volumeSnapshots) == 0 {
return obj, nil
}

// annotate the pod with the successful volume snapshots
for volume, snapshot := range volumeSnapshots {
restic.SetPodSnapshotAnnotation(pod, volume, snapshot)
}

// convert annotated pod back to unstructured to return
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod)
if err != nil {
return nil, []error{errors.WithStack(err)}
if ib.resticBackupper == nil {
log.Warn("No restic backupper, not backing up pod's volumes")
return nil, nil
}

return &unstructured.Unstructured{Object: unstructuredObj}, nil
return ib.resticBackupper.BackupPodVolumes(ib.backup, pod, log)
}

func (ib *defaultItemBackupper) executeActions(log logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, name, namespace string, metadata metav1.Object) error {
Expand Down Expand Up @@ -347,15 +351,29 @@ const zoneLabel = "failure-domain.beta.kubernetes.io/zone"
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (ib *defaultItemBackupper) takePVSnapshot(pv runtime.Unstructured, backup *api.Backup, log logrus.FieldLogger) error {
func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup *api.Backup, log logrus.FieldLogger) error {
log.Info("Executing takePVSnapshot")

if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes {
log.Info("Backup has volume snapshots disabled; skipping volume snapshot action.")
return nil
}

metadata, err := meta.Accessor(pv)
pv := new(corev1api.PersistentVolume)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pv); err != nil {
return errors.WithStack(err)
}

// If this PV is claimed, see if we've already taken a (restic) snapshot of the contents
// of this PV. If so, don't take a snapshot.
if pv.Spec.ClaimRef != nil {
if ib.resticSnapshotTracker.Has(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) {
log.Info("Skipping Persistent Volume snapshot because volume has already been backed up.")
return nil
}
}

metadata, err := meta.Accessor(obj)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -370,7 +388,7 @@ func (ib *defaultItemBackupper) takePVSnapshot(pv runtime.Unstructured, backup *
log.Infof("label %q is not present on PersistentVolume", zoneLabel)
}

volumeID, err := ib.snapshotService.GetVolumeID(pv)
volumeID, err := ib.snapshotService.GetVolumeID(obj)
if err != nil {
return errors.Wrapf(err, "error getting volume ID for PersistentVolume")
}
Expand Down
Loading

0 comments on commit eaeb9d6

Please sign in to comment.