Skip to content

Commit

Permalink
Merge pull request #710 from skriss/resumable-plugins-rebased
Browse files Browse the repository at this point in the history
Resumable plugins rebased
  • Loading branch information
nrb authored Jul 31, 2018
2 parents 131afb5 + 1305121 commit 430ec24
Show file tree
Hide file tree
Showing 101 changed files with 5,923 additions and 1,823 deletions.
12 changes: 2 additions & 10 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ import (
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
kubeutil "github.com/heptio/ark/pkg/util/kube"
"github.com/heptio/ark/pkg/util/logging"
)

// Backupper performs backups.
type Backupper interface {
// Backup takes a backup using the specification in the api.Backup and writes backup and log data
// to the given writers.
Backup(backup *api.Backup, backupFile, logFile io.Writer, actions []ItemAction) error
Backup(logger logrus.FieldLogger, backup *api.Backup, backupFile io.Writer, actions []ItemAction) error
}

// kubernetesBackupper implements Backupper.
Expand Down Expand Up @@ -212,20 +211,13 @@ func getResourceHook(hookSpec api.BackupResourceHookSpec, discoveryHelper discov

// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
// written to backupFile. The finalized api.Backup is written to metadata.
func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io.Writer, actions []ItemAction) error {
func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backup *api.Backup, backupFile io.Writer, actions []ItemAction) error {
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()

tw := tar.NewWriter(gzippedData)
defer tw.Close()

gzippedLog := gzip.NewWriter(logFile)
defer gzippedLog.Close()

logger := logrus.New()
logger.Out = gzippedLog
logger.Hooks.Add(&logging.ErrorLocationHook{})
logger.Hooks.Add(&logging.LogLocationHook{})
log := logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")

Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/backup_pv_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type backupPVAction struct {
log logrus.FieldLogger
}

func NewBackupPVAction(log logrus.FieldLogger) ItemAction {
return &backupPVAction{log: log}
func NewBackupPVAction(logger logrus.FieldLogger) ItemAction {
return &backupPVAction{log: logger}
}

func (a *backupPVAction) AppliesTo() (ResourceSelector, error) {
Expand Down
26 changes: 6 additions & 20 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package backup

import (
"bytes"
"compress/gzip"
"io"
"reflect"
"sort"
"testing"
Expand All @@ -46,6 +44,7 @@ import (
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
kubeutil "github.com/heptio/ark/pkg/util/kube"
"github.com/heptio/ark/pkg/util/logging"
arktest "github.com/heptio/ark/pkg/util/test"
)

Expand Down Expand Up @@ -549,22 +548,9 @@ func TestBackup(t *testing.T) {
groupBackupper.On("backupGroup", group).Return(err)
}

var backupFile, logFile bytes.Buffer

err = b.Backup(test.backup, &backupFile, &logFile, nil)
defer func() {
// print log if anything failed
if t.Failed() {
gzr, err := gzip.NewReader(&logFile)
require.NoError(t, err)
t.Log("Backup log contents:")
var buf bytes.Buffer
_, err = io.Copy(&buf, gzr)
require.NoError(t, err)
require.NoError(t, gzr.Close())
t.Log(buf.String())
}
}()
var backupFile bytes.Buffer

err = b.Backup(logging.DefaultLogger(logrus.DebugLevel), test.backup, &backupFile, nil)

if test.expectedError != nil {
assert.EqualError(t, err, test.expectedError.Error())
Expand Down Expand Up @@ -610,7 +596,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
).Return(&mockGroupBackupper{})

assert.NoError(t, b.Backup(&v1.Backup{}, &bytes.Buffer{}, &bytes.Buffer{}, nil))
assert.NoError(t, b.Backup(arktest.NewLogger(), &v1.Backup{}, &bytes.Buffer{}, nil))
groupBackupperFactory.AssertExpectations(t)

// mutate the cohabitatingResources map that was used in the first backup to simulate
Expand Down Expand Up @@ -642,7 +628,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
).Return(&mockGroupBackupper{})

assert.NoError(t, b.Backup(&v1.Backup{}, &bytes.Buffer{}, &bytes.Buffer{}, nil))
assert.NoError(t, b.Backup(arktest.NewLogger(), &v1.Backup{}, &bytes.Buffer{}, nil))
assert.NotEqual(t, firstCohabitatingResources, secondCohabitatingResources)
for _, resource := range secondCohabitatingResources {
assert.False(t, resource.seen)
Expand Down
5 changes: 0 additions & 5 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/heptio/ark/pkg/podexec"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
"github.com/heptio/ark/pkg/util/logging"
)

type itemBackupperFactory interface {
Expand Down Expand Up @@ -320,10 +319,6 @@ func (ib *defaultItemBackupper) executeActions(

log.Info("Executing custom action")

if logSetter, ok := action.ItemAction.(logging.LogSetter); ok {
logSetter.SetLog(log)
}

updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup)
if err != nil {
// We want this to show up in the log file at the place where the error occurs. When we return
Expand Down
15 changes: 15 additions & 0 deletions pkg/backup/mocks/item_action.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/backup/pod_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type podAction struct {
}

// NewPodAction creates a new ItemAction for pods.
func NewPodAction(log logrus.FieldLogger) ItemAction {
return &podAction{log: log}
func NewPodAction(logger logrus.FieldLogger) ItemAction {
return &podAction{log: logger}
}

// AppliesTo returns a ResourceSelector that applies only to pods.
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/service_account_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type serviceAccountAction struct {
}

// NewServiceAccountAction creates a new ItemAction for service accounts.
func NewServiceAccountAction(log logrus.FieldLogger, clusterRoleBindingListers map[string]ClusterRoleBindingLister, discoveryHelper arkdiscovery.Helper) (ItemAction, error) {
func NewServiceAccountAction(logger logrus.FieldLogger, clusterRoleBindingListers map[string]ClusterRoleBindingLister, discoveryHelper arkdiscovery.Helper) (ItemAction, error) {
// Look up the supported RBAC version
var supportedAPI metav1.GroupVersionForDiscovery
for _, ag := range discoveryHelper.APIGroups() {
Expand All @@ -58,7 +58,7 @@ func NewServiceAccountAction(log logrus.FieldLogger, clusterRoleBindingListers m
}

return &serviceAccountAction{
log: log,
log: logger,
clusterRoleBindings: crbs,
}, nil
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/buildinfo/version_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package buildinfo

import (
Expand Down
15 changes: 15 additions & 0 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client

import (
Expand Down
6 changes: 4 additions & 2 deletions pkg/cloudprovider/aws/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -40,6 +41,7 @@ const regionKey = "region"
var iopsVolumeTypes = sets.NewString("io1")

type blockStore struct {
log logrus.FieldLogger
ec2 *ec2.EC2
}

Expand All @@ -56,8 +58,8 @@ func getSession(config *aws.Config) (*session.Session, error) {
return sess, nil
}

func NewBlockStore() cloudprovider.BlockStore {
return &blockStore{}
func NewBlockStore(logger logrus.FieldLogger) cloudprovider.BlockStore {
return &blockStore{log: logger}
}

func (b *blockStore) Init(config map[string]string) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cloudprovider/aws/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/heptio/ark/pkg/cloudprovider"
)
Expand All @@ -38,13 +39,14 @@ const (
)

type objectStore struct {
log logrus.FieldLogger
s3 *s3.S3
s3Uploader *s3manager.Uploader
kmsKeyID string
}

func NewObjectStore() cloudprovider.ObjectStore {
return &objectStore{}
func NewObjectStore(logger logrus.FieldLogger) cloudprovider.ObjectStore {
return &objectStore{log: logger}
}

func (o *objectStore) Init(config map[string]string) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cloudprovider/azure/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
"github.com/pkg/errors"
"github.com/satori/uuid"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/runtime"

Expand All @@ -52,6 +53,7 @@ const (
)

type blockStore struct {
log logrus.FieldLogger
disks *disk.DisksClient
snaps *disk.SnapshotsClient
subscription string
Expand Down Expand Up @@ -87,8 +89,8 @@ func getConfig() map[string]string {
return cfg
}

func NewBlockStore() cloudprovider.BlockStore {
return &blockStore{}
func NewBlockStore(logger logrus.FieldLogger) cloudprovider.BlockStore {
return &blockStore{log: logger}
}

func (b *blockStore) Init(config map[string]string) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cloudprovider/azure/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import (

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/heptio/ark/pkg/cloudprovider"
)

type objectStore struct {
blobClient *storage.BlobStorageClient
log logrus.FieldLogger
}

func NewObjectStore() cloudprovider.ObjectStore {
return &objectStore{}
func NewObjectStore(logger logrus.FieldLogger) cloudprovider.ObjectStore {
return &objectStore{log: logger}
}

func (o *objectStore) Init(config map[string]string) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/cloudprovider/backup_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
)

// backupCacheBucket holds the backups and error from a GetAllBackups call.
// backupCacheBucket holds the backups and error from a ListBackups call.
type backupCacheBucket struct {
backups []*v1.Backup
error error
}

// backupCache caches GetAllBackups calls, refreshing them periodically.
// backupCache caches ListBackups calls, refreshing them periodically.
type backupCache struct {
delegate BackupGetter
delegate BackupLister
lock sync.RWMutex
// This doesn't really need to be a map right now, but if we ever move to supporting multiple
// buckets, this will be ready for it.
buckets map[string]*backupCacheBucket
logger logrus.FieldLogger
}

var _ BackupGetter = &backupCache{}
var _ BackupLister = &backupCache{}

// NewBackupCache returns a new backup cache that refreshes from delegate every resyncPeriod.
func NewBackupCache(ctx context.Context, delegate BackupGetter, resyncPeriod time.Duration, logger logrus.FieldLogger) BackupGetter {
func NewBackupCache(ctx context.Context, delegate BackupLister, resyncPeriod time.Duration, logger logrus.FieldLogger) BackupLister {
c := &backupCache{
delegate: delegate,
buckets: make(map[string]*backupCacheBucket),
Expand All @@ -70,11 +70,11 @@ func (c *backupCache) refresh() {

for bucketName, bucket := range c.buckets {
c.logger.WithField("bucket", bucketName).Debug("Refreshing bucket")
bucket.backups, bucket.error = c.delegate.GetAllBackups(bucketName)
bucket.backups, bucket.error = c.delegate.ListBackups(bucketName)
}
}

func (c *backupCache) GetAllBackups(bucketName string) ([]*v1.Backup, error) {
func (c *backupCache) ListBackups(bucketName string) ([]*v1.Backup, error) {
c.lock.RLock()
bucket, found := c.buckets[bucketName]
c.lock.RUnlock()
Expand All @@ -88,7 +88,7 @@ func (c *backupCache) GetAllBackups(bucketName string) ([]*v1.Backup, error) {

logContext.Debug("Bucket is not in cache - doing a live lookup")

backups, err := c.delegate.GetAllBackups(bucketName)
backups, err := c.delegate.ListBackups(bucketName)
c.lock.Lock()
c.buckets[bucketName] = &backupCacheBucket{backups: backups, error: err}
c.lock.Unlock()
Expand Down
Loading

0 comments on commit 430ec24

Please sign in to comment.