Skip to content

Commit

Permalink
BIAv2 async operations controller work
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Feb 20, 2023
1 parent 7139daf commit 77e445a
Show file tree
Hide file tree
Showing 57 changed files with 3,031 additions and 640 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5849-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
BIAv2 async operations controller work
21 changes: 21 additions & 0 deletions config/crd/v1/bases/velero.io_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ spec:
type: string
nullable: true
type: array
itemOperationTimeout:
description: ItemOperationTimeout specifies the time used to wait
for asynchronous BackupItemAction operations The default value is
1 hour.
type: string
labelSelector:
description: LabelSelector is a metav1.LabelSelector to filter with
when adding individual objects to the backup. If empty or nil, all
Expand Down Expand Up @@ -415,6 +420,20 @@ spec:
status:
description: BackupStatus captures the current status of a Velero backup.
properties:
asyncBackupItemOperationsAttempted:
description: AsyncBackupItemOperationsAttempted is the total number
of attempted async BackupItemAction operations for this backup.
type: integer
asyncBackupItemOperationsCompleted:
description: AsyncBackupItemOperationsCompleted is the total number
of successfully completed async BackupItemAction operations for
this backup.
type: integer
asyncBackupItemOperationsFailed:
description: AsyncBackupItemOperationsFailed is the total number of
async BackupItemAction operations for this backup which ended with
an error.
type: integer
completionTimestamp:
description: CompletionTimestamp records the time a backup was completed.
Completion time is recorded even on failed backups. Completion time
Expand Down Expand Up @@ -457,6 +476,8 @@ spec:
- InProgress
- WaitingForPluginOperations
- WaitingForPluginOperationsPartiallyFailed
- FinalizingAfterPluginOperations
- FinalizingAfterPluginOperationsPartiallyFailed
- Completed
- PartiallyFailed
- Failed
Expand Down
5 changes: 5 additions & 0 deletions config/crd/v1/bases/velero.io_schedules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ spec:
type: string
nullable: true
type: array
itemOperationTimeout:
description: ItemOperationTimeout specifies the time used to wait
for asynchronous BackupItemAction operations The default value
is 1 hour.
type: string
labelSelector:
description: LabelSelector is a metav1.LabelSelector to filter
with when adding individual objects to the backup. If empty
Expand Down
4 changes: 2 additions & 2 deletions config/crd/v1/crds/crds.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/delete/delete_item_action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// Context provides the necessary environment to run DeleteItemAction plugins
type Context struct {
Backup *velerov1api.Backup
BackupReader io.Reader
BackupReaders []io.Reader
Actions []velero.DeleteItemAction
Filesystem filesystem.Interface
Log logrus.FieldLogger
Expand All @@ -59,7 +59,7 @@ func InvokeDeleteActions(ctx *Context) error {
}

// get items out of backup tarball into a temp directory
dir, err := archive.NewExtractor(ctx.Log, ctx.Filesystem).UnzipAndExtractBackup(ctx.BackupReader)
dir, err := archive.NewExtractor(ctx.Log, ctx.Filesystem).UnzipAndExtractBackup(ctx.BackupReaders)
if err != nil {
return errors.Wrapf(err, "error extracting backup")
}
Expand Down
36 changes: 18 additions & 18 deletions internal/delete/delete_item_action_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
name string
backup *velerov1api.Backup
apiResources []*test.APIResource
tarball io.Reader
tarballs []io.Reader
actions map[*recordResourcesAction][]string // recordResourceActions are the plugins that will capture item ids, the []string values are the ids we'll test against.
}{
{
name: "single action with no selector runs for all items",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumes", builder.ForPersistentVolume("pv-1").Result(), builder.ForPersistentVolume("pv-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction): {"ns-1/pod-1", "ns-2/pod-2", "pv-1", "pv-2"},
Expand All @@ -65,10 +65,10 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "single action with a resource selector for namespaced resources runs only for matching resources",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumes", builder.ForPersistentVolume("pv-1").Result(), builder.ForPersistentVolume("pv-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForResource("pods"): {"ns-1/pod-1", "ns-2/pod-2"},
Expand All @@ -77,10 +77,10 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "single action with a resource selector for cluster-scoped resources runs only for matching resources",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumes", builder.ForPersistentVolume("pv-1").Result(), builder.ForPersistentVolume("pv-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForResource("persistentvolumes"): {"pv-1", "pv-2"},
Expand All @@ -89,11 +89,11 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "single action with a namespace selector runs only for resources in that namespace",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumeclaims", builder.ForPersistentVolumeClaim("ns-1", "pvc-1").Result(), builder.ForPersistentVolumeClaim("ns-2", "pvc-2").Result()).
AddItems("persistentvolumes", builder.ForPersistentVolume("pv-1").Result(), builder.ForPersistentVolume("pv-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVCs(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForNamespace("ns-1"): {"ns-1/pod-1", "ns-1/pvc-1"},
Expand All @@ -102,11 +102,11 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "multiple actions, each with a different resource selector using short name, run for matching resources",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumeclaims", builder.ForPersistentVolumeClaim("ns-1", "pvc-1").Result(), builder.ForPersistentVolumeClaim("ns-2", "pvc-2").Result()).
AddItems("persistentvolumes", builder.ForPersistentVolume("pv-1").Result(), builder.ForPersistentVolume("pv-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVCs(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForResource("po"): {"ns-1/pod-1", "ns-2/pod-2"},
Expand All @@ -116,10 +116,10 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "actions with selectors that don't match anything don't run for any resources",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").Result()).
AddItems("persistentvolumeclaims", builder.ForPersistentVolumeClaim("ns-2", "pvc-2").Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVCs(), test.PVs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForNamespace("ns-1").ForResource("persistentvolumeclaims"): nil,
Expand All @@ -129,10 +129,10 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "single action with label selector runs only for those items",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
tarballs: []io.Reader{test.NewTarWriter(t).
AddItems("pods", builder.ForPod("ns-1", "pod-1").ObjectMeta(builder.WithLabels("app", "app1")).Result(), builder.ForPod("ns-2", "pod-2").Result()).
AddItems("persistentvolumeclaims", builder.ForPersistentVolumeClaim("ns-1", "pvc-1").Result(), builder.ForPersistentVolumeClaim("ns-2", "pvc-2").ObjectMeta(builder.WithLabels("app", "app1")).Result()).
Done(),
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVCs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForLabelSelector("app=app1"): {"ns-1/pod-1", "ns-2/pvc-2"},
Expand All @@ -141,8 +141,8 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {
{
name: "success if resources dir does not exist",
backup: builder.ForBackup("velero", "velero").Result(),
tarball: test.NewTarWriter(t).
Done(),
tarballs: []io.Reader{test.NewTarWriter(t).
Done()},
apiResources: []*test.APIResource{test.Pods(), test.PVCs()},
actions: map[*recordResourcesAction][]string{
new(recordResourcesAction).ForNamespace("ns-1").ForResource("persistentvolumeclaims"): nil,
Expand All @@ -167,7 +167,7 @@ func TestInvokeDeleteItemActionsRunForCorrectItems(t *testing.T) {

c := &Context{
Backup: tc.backup,
BackupReader: tc.tarball,
BackupReaders: tc.tarballs,
Filesystem: fs,
DiscoveryHelper: h.discoveryHelper,
Actions: actions,
Expand Down
39 changes: 38 additions & 1 deletion pkg/apis/velero/v1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ type BackupSpec struct {
// The default value is 10 minute.
// +optional
CSISnapshotTimeout metav1.Duration `json:"csiSnapshotTimeout,omitempty"`

// ItemOperationTimeout specifies the time used to wait for asynchronous BackupItemAction operations
// The default value is 1 hour.
// +optional
ItemOperationTimeout metav1.Duration `json:"itemOperationTimeout,omitempty"`
}

// BackupHooks contains custom behaviors that should be executed at different phases of the backup.
Expand Down Expand Up @@ -221,7 +226,7 @@ const (

// BackupPhase is a string representation of the lifecycle phase
// of a Velero backup.
// +kubebuilder:validation:Enum=New;FailedValidation;InProgress;WaitingForPluginOperations;WaitingForPluginOperationsPartiallyFailed;Completed;PartiallyFailed;Failed;Deleting
// +kubebuilder:validation:Enum=New;FailedValidation;InProgress;WaitingForPluginOperations;WaitingForPluginOperationsPartiallyFailed;FinalizingAfterPluginOperations;FinalizingAfterPluginOperationsPartiallyFailed;Completed;PartiallyFailed;Failed;Deleting
type BackupPhase string

const (
Expand Down Expand Up @@ -251,6 +256,23 @@ const (
// ongoing. The backup is not usable yet.
BackupPhaseWaitingForPluginOperationsPartiallyFailed BackupPhase = "WaitingForPluginOperationsPartiallyFailed"

// BackupPhaseFinalizingAfterPluginOperations means the backup of
// Kubernetes resources, creation of snapshots, and other
// async plugin operations were successful and snapshot upload and
// other plugin operations are now complete, but the Backup is awaiting
// final update of resources modified during async operations.
// The backup is not usable yet.
BackupPhaseFinalizingAfterPluginOperations BackupPhase = "FinalizingAfterPluginOperations"

// BackupPhaseFinalizingAfterPluginOperationsPartiallyFailed means the backup of
// Kubernetes resources, creation of snapshots, and other
// async plugin operations were successful and snapshot upload and
// other plugin operations are now complete, but one or more errors
// occurred during backup or async operation processing, and the
// Backup is awaiting final update of resources modified during async
// operations. The backup is not usable yet.
BackupPhaseFinalizingAfterPluginOperationsPartiallyFailed BackupPhase = "FinalizingAfterPluginOperationsPartiallyFailed"

// BackupPhaseCompleted means the backup has run successfully without
// errors.
BackupPhaseCompleted BackupPhase = "Completed"
Expand Down Expand Up @@ -351,6 +373,21 @@ type BackupStatus struct {
// completed CSI VolumeSnapshots for this backup.
// +optional
CSIVolumeSnapshotsCompleted int `json:"csiVolumeSnapshotsCompleted,omitempty"`

// AsyncBackupItemOperationsAttempted is the total number of attempted
// async BackupItemAction operations for this backup.
// +optional
AsyncBackupItemOperationsAttempted int `json:"asyncBackupItemOperationsAttempted,omitempty"`

// AsyncBackupItemOperationsCompleted is the total number of successfully completed
// async BackupItemAction operations for this backup.
// +optional
AsyncBackupItemOperationsCompleted int `json:"asyncBackupItemOperationsCompleted,omitempty"`

// AsyncBackupItemOperationsFailed is the total number of async
// BackupItemAction operations for this backup which ended with an error.
// +optional
AsyncBackupItemOperationsFailed int `json:"asyncBackupItemOperationsFailed,omitempty"`
}

// BackupProgress stores information about the progress of a Backup's execution.
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/velero/v1/download_request_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DownloadTargetKind string
const (
DownloadTargetKindBackupLog DownloadTargetKind = "BackupLog"
DownloadTargetKindBackupContents DownloadTargetKind = "BackupContents"
DownloadTargetKindBackupContentsFinalUpdates DownloadTargetKind = "BackupContentsFinalUpdates"
DownloadTargetKindBackupVolumeSnapshots DownloadTargetKind = "BackupVolumeSnapshots"
DownloadTargetKindBackupItemOperations DownloadTargetKind = "BackupItemOperations"
DownloadTargetKindBackupResourceList DownloadTargetKind = "BackupResourceList"
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/velero/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 52 additions & 35 deletions pkg/archive/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,27 @@ func NewExtractor(log logrus.FieldLogger, fs filesystem.Interface) *Extractor {
}

// UnzipAndExtractBackup extracts a reader on a gzipped tarball to a local temp directory
func (e *Extractor) UnzipAndExtractBackup(src io.Reader) (string, error) {
gzr, err := gzip.NewReader(src)
if err != nil {
e.log.Infof("error creating gzip reader: %v", err)
return "", err
func (e *Extractor) UnzipAndExtractBackup(sources []io.Reader) (string, error) {
var gzrs []*gzip.Reader
defer func() {
for _, gzr := range gzrs {
gzr.Close()
}
}()
for _, src := range sources {
gzr, err := gzip.NewReader(src)
if err != nil {
e.log.Infof("error creating gzip reader: %v", err)
return "", err
}
gzrs = append(gzrs, gzr)
}
defer gzr.Close()

return e.readBackup(tar.NewReader(gzr))
var tarReaders []*tar.Reader
for _, gzr := range gzrs {
tarReaders = append(tarReaders, tar.NewReader(gzr))
}
return e.readBackup(tarReaders)
}

func (e *Extractor) writeFile(target string, tarRdr *tar.Reader) error {
Expand All @@ -66,46 +78,51 @@ func (e *Extractor) writeFile(target string, tarRdr *tar.Reader) error {
return nil
}

func (e *Extractor) readBackup(tarRdr *tar.Reader) (string, error) {
// Reads from a slice of tar.Reader objects. Files in the later tar archives will overwrite files
// with the same path in the same archive. This allows later updates to "modify" files in the
// archive without having to recreate the entire archive.
func (e *Extractor) readBackup(tarRdrs []*tar.Reader) (string, error) {
dir, err := e.fs.TempDir("", "")
if err != nil {
e.log.Infof("error creating temp dir: %v", err)
return "", err
}

for {
header, err := tarRdr.Next()
for _, tarRdr := range tarRdrs {
for {
header, err := tarRdr.Next()

if err == io.EOF {
break
}
if err != nil {
e.log.Infof("error reading tar: %v", err)
return "", err
}

target := filepath.Join(dir, header.Name) //nolint:gosec

switch header.Typeflag {
case tar.TypeDir:
err := e.fs.MkdirAll(target, header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
return "", err
if err == io.EOF {
break
}

case tar.TypeReg:
// make sure we have the directory created
err := e.fs.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
e.log.Infof("error reading tar: %v", err)
return "", err
}

// create the file
if err := e.writeFile(target, tarRdr); err != nil {
e.log.Infof("error copying: %v", err)
return "", err
target := filepath.Join(dir, header.Name) //nolint:gosec

switch header.Typeflag {
case tar.TypeDir:
err := e.fs.MkdirAll(target, header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
return "", err
}

case tar.TypeReg:
// make sure we have the directory created
err := e.fs.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
if err != nil {
e.log.Infof("mkdirall error: %v", err)
return "", err
}

// create the file
if err := e.writeFile(target, tarRdr); err != nil {
e.log.Infof("error copying: %v", err)
return "", err
}
}
}
}
Expand Down
Loading

0 comments on commit 77e445a

Please sign in to comment.