Skip to content

Commit

Permalink
only sync a backup location if it's changed since last sync
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <[email protected]>
  • Loading branch information
skriss committed Oct 3, 2018
1 parent 82ab2d7 commit eb709b8
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 107 deletions.
11 changes: 8 additions & 3 deletions pkg/apis/ark/v1/backup_storage_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package v1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -89,6 +92,8 @@ const (

// BackupStorageLocationStatus describes the current status of an Ark BackupStorageLocation.
type BackupStorageLocationStatus struct {
Phase BackupStorageLocationPhase `json:"phase,omitempty"`
AccessMode BackupStorageLocationAccessMode `json:"accessMode,omitempty"`
Phase BackupStorageLocationPhase `json:"phase,omitempty"`
AccessMode BackupStorageLocationAccessMode `json:"accessMode,omitempty"`
LastSyncedRevision types.UID `json:"lastSyncedRevision,omitempty"`
LastSyncedTime metav1.Time `json:"lastSyncedTime,omitempty"`
}
3 changes: 2 additions & 1 deletion pkg/apis/ark/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s *server) loadConfig() (*api.Config, error) {
}

const (
defaultBackupSyncPeriod = 60 * time.Minute
defaultBackupSyncPeriod = time.Minute
defaultPodVolumeOperationTimeout = 60 * time.Minute
)

Expand Down Expand Up @@ -601,6 +601,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
}

backupSyncController := controller.NewBackupSyncController(
s.arkClient.ArkV1(),
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
Expand Down
114 changes: 88 additions & 26 deletions pkg/controller/backup_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"encoding/json"
"time"

"github.com/pkg/errors"
Expand All @@ -25,6 +26,7 @@ import (
kuberrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"

Expand All @@ -34,14 +36,14 @@ import (
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/persistence"
"github.com/heptio/ark/pkg/plugin"
"github.com/heptio/ark/pkg/util/kube"
"github.com/heptio/ark/pkg/util/stringslice"
)

type backupSyncController struct {
*genericController

client arkv1client.BackupsGetter
backupClient arkv1client.BackupsGetter
backupLocationClient arkv1client.BackupStorageLocationsGetter
backupLister listers.BackupLister
backupStorageLocationLister listers.BackupStorageLocationLister
namespace string
Expand All @@ -51,7 +53,8 @@ type backupSyncController struct {
}

func NewBackupSyncController(
client arkv1client.BackupsGetter,
backupClient arkv1client.BackupsGetter,
backupLocationClient arkv1client.BackupStorageLocationsGetter,
backupInformer informers.BackupInformer,
backupStorageLocationInformer informers.BackupStorageLocationInformer,
syncPeriod time.Duration,
Expand All @@ -67,7 +70,8 @@ func NewBackupSyncController(

c := &backupSyncController{
genericController: newGenericController("backup-sync", logger),
client: client,
backupClient: backupClient,
backupLocationClient: backupLocationClient,
namespace: namespace,
defaultBackupLocation: defaultBackupLocation,
backupLister: backupInformer.Lister(),
Expand All @@ -91,8 +95,35 @@ func NewBackupSyncController(

const gcFinalizer = "gc.ark.heptio.com"

func shouldSync(location *arkv1api.BackupStorageLocation, now time.Time, backupStore persistence.BackupStore, log logrus.FieldLogger) (bool, string) {
log = log.WithFields(map[string]interface{}{
"lastSyncedRevision": location.Status.LastSyncedRevision,
"lastSyncedTime": location.Status.LastSyncedTime.Time.Format(time.RFC1123Z),
})

revision, err := backupStore.GetRevision()
if err != nil {
log.WithError(err).Info("Error getting backup store's revision, syncing")
return true, ""
}
log = log.WithField("revision", revision)

if location.Status.LastSyncedTime.Add(time.Hour).Before(now) {
log.Infof("Backup location hasn't been synced in more than %s, syncing", time.Hour)
return true, revision
}

if string(location.Status.LastSyncedRevision) != revision {
log.Info("Backup location hasn't been synced since its last modification, syncing")
return true, revision
}

log.Debug("Backup location's contents haven't changed since last sync, not syncing")
return false, ""
}

func (c *backupSyncController) run() {
c.logger.Info("Syncing backups from backup storage into cluster")
c.logger.Info("Checking for backup storage locations to sync into cluster")

locations, err := c.backupStorageLocationLister.BackupStorageLocations(c.namespace).List(labels.Everything())
if err != nil {
Expand All @@ -103,35 +134,37 @@ func (c *backupSyncController) run() {
locations = orderedBackupLocations(locations, c.defaultBackupLocation)

pluginManager := c.newPluginManager(c.logger)
defer pluginManager.CleanupClients()

for _, location := range locations {
log := c.logger.WithField("backupLocation", location.Name)
log.Info("Syncing backups from backup location")

backupStore, err := c.newBackupStore(location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting backup store for location")
continue
}

backupsInBackupStore, err := backupStore.ListBackups()
ok, revision := shouldSync(location, time.Now().UTC(), backupStore, log)
if !ok {
continue
}

res, err := backupStore.ListBackups()
if err != nil {
log.WithError(err).Error("Error listing backups in backup store")
continue
}
backupStoreBackups := sets.NewString(res...)
log.WithField("backupCount", len(backupStoreBackups)).Info("Got backups from backup store")

log.WithField("backupCount", len(backupsInBackupStore)).Info("Got backups from backup store")

cloudBackupNames := sets.NewString()
for _, cloudBackup := range backupsInBackupStore {
log = log.WithField("backup", kube.NamespaceAndName(cloudBackup))
log.Debug("Checking cloud backup to see if it needs to be synced into the cluster")

cloudBackupNames.Insert(cloudBackup.Name)
for backupName := range backupStoreBackups {
log = log.WithField("backup", backupName)
log.Debug("Checking backup store backup to see if it needs to be synced into the cluster")

// use the controller's namespace when getting the backup because that's where we
// are syncing backups to, regardless of the namespace of the cloud backup.
_, err := c.client.Backups(c.namespace).Get(cloudBackup.Name, metav1.GetOptions{})
_, err := c.backupClient.Backups(c.namespace).Get(backupName, metav1.GetOptions{})
if err == nil {
log.Debug("Backup already exists in cluster")
continue
Expand All @@ -140,22 +173,28 @@ func (c *backupSyncController) run() {
log.WithError(errors.WithStack(err)).Error("Error getting backup from client, proceeding with sync into cluster")
}

backup, err := backupStore.GetBackupMetadata(backupName)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting backup metadata from backup store")
continue
}

// remove the pre-v0.8.0 gcFinalizer if it exists
// TODO(1.0): remove this
cloudBackup.Finalizers = stringslice.Except(cloudBackup.Finalizers, gcFinalizer)
cloudBackup.Namespace = c.namespace
cloudBackup.ResourceVersion = ""
backup.Finalizers = stringslice.Except(backup.Finalizers, gcFinalizer)
backup.Namespace = c.namespace
backup.ResourceVersion = ""

// update the StorageLocation field and label since the name of the location
// may be different in this cluster than in the cluster that created the
// backup.
cloudBackup.Spec.StorageLocation = location.Name
if cloudBackup.Labels == nil {
cloudBackup.Labels = make(map[string]string)
backup.Spec.StorageLocation = location.Name
if backup.Labels == nil {
backup.Labels = make(map[string]string)
}
cloudBackup.Labels[arkv1api.StorageLocationLabel] = cloudBackup.Spec.StorageLocation
backup.Labels[arkv1api.StorageLocationLabel] = backup.Spec.StorageLocation

_, err = c.client.Backups(cloudBackup.Namespace).Create(cloudBackup)
_, err = c.backupClient.Backups(backup.Namespace).Create(backup)
switch {
case err != nil && kuberrs.IsAlreadyExists(err):
log.Debug("Backup already exists in cluster")
Expand All @@ -166,7 +205,30 @@ func (c *backupSyncController) run() {
}
}

c.deleteOrphanedBackups(location.Name, cloudBackupNames, log)
c.deleteOrphanedBackups(location.Name, backupStoreBackups, log)

// update the location's status's last-synced fields
patch := map[string]interface{}{
"status": map[string]interface{}{
"lastSyncedTime": time.Now().UTC(),
"lastSyncedRevision": revision,
},
}

patchBytes, err := json.Marshal(patch)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error marshaling last-synced patch to JSON")
continue
}

if _, err = c.backupLocationClient.BackupStorageLocations(c.namespace).Patch(
location.Name,
types.MergePatchType,
patchBytes,
); err != nil {
log.WithError(errors.WithStack(err)).Error("Error patching backup location's last-synced time and revision")
continue
}
}
}

Expand All @@ -192,7 +254,7 @@ func (c *backupSyncController) deleteOrphanedBackups(locationName string, cloudB
continue
}

if err := c.client.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
if err := c.backupClient.Backups(backup.Namespace).Delete(backup.Name, nil); err != nil {
log.WithError(errors.WithStack(err)).Error("Error deleting orphaned backup from cluster")
} else {
log.Debug("Deleted orphaned backup from cluster")
Expand Down
Loading

0 comments on commit eb709b8

Please sign in to comment.