Skip to content

Commit

Permalink
Update garbage collection logic
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Jun 5, 2018
1 parent 819788e commit 6129c45
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 75 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

### Added

- Integration test for AWS
- Incremental backup of etcd, where full snapshot is taken first and then we apply watch and persist the logs accumulated over certain period to snapshot store. Restore process, restores from the full snapshot, start the embedded etcd and apply the logged events one by one.

- Initial setup for Integration test for AWS

## 0.2.3 - 2018-05-22

Expand Down
5 changes: 4 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
maxBackups,
deltaSnapshotIntervalSeconds,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
tlsConfig)
if err != nil {
logger.Fatalf("Failed to create snapshotter from configured storage provider: %v", err)
Expand All @@ -128,7 +129,8 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
handler.Status = http.StatusServiceUnavailable
continue
}

gcStopCh := make(chan bool)
go ssr.GarbageCollector(gcStopCh)
if err := ssr.Run(stopCh); err != nil {
handler.Status = http.StatusServiceUnavailable
if etcdErr, ok := err.(*errors.EtcdError); ok == true {
Expand All @@ -139,6 +141,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
} else {
handler.Status = http.StatusOK
}
gcStopCh <- true
}
},
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ storing snapshots on various cloud storage providers as well as local disk locat
maxBackups,
deltaSnapshotIntervalSeconds,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
tlsConfig)
if err != nil {
logger.Fatalf("Failed to create snapshotter: %v", err)
}

gcStopCh := make(chan bool)
go ssr.GarbageCollector(gcStopCh)
if err := ssr.Run(stopCh); err != nil {
logger.Fatalf("Snapshotter failed with error: %v", err)
}
gcStopCh <- true
logger.Info("Shutting down...")
return
},
Expand All @@ -79,6 +82,7 @@ func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-interval-seconds", "i", 10, "Interval in no. of seconds after which delta snapshot will be persisted")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep")
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 30, "Period in seconds for garbage collecting old backups")
cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections")
cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
cmd.Flags().StringVar(&certFile, "cert", "", "identify secure client using this TLS certificate file")
Expand Down
21 changes: 11 additions & 10 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ var (
logger = logrus.New()

//snapshotter flags
schedule string
etcdEndpoints []string
deltaSnapshotIntervalSeconds int
maxBackups int
etcdConnectionTimeout int
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string
schedule string
etcdEndpoints []string
deltaSnapshotIntervalSeconds int
maxBackups int
etcdConnectionTimeout int
garbageCollectionPeriodSeconds int
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string

//server flags
port int
Expand Down
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Do(retryFunc func() error, config *Config) error {
for n < config.Attempts {
delayTime := config.Delay * (1 << (n - 1))
time.Sleep((time.Duration)(delayTime) * config.Units)
config.Logger.Infof("Job attempt: %d", n)
config.Logger.Infof("Job attempt: %d", n+1)
err = retryFunc()
if err == nil {
return nil
Expand Down
49 changes: 45 additions & 4 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
if err := r.restoreFromBaseSnapshot(ro); err != nil {
return fmt.Errorf("failed to restore from the base snapshot :%v", err)
}

if len(ro.DeltaSnapList) == 0 {
r.logger.Infof("No delta snapshots present over base snapshot.")
return nil
}
r.logger.Infof("Starting embedded etcd server...")
e, err := startEmbeddedEtcd(ro)
if err != nil {
Expand All @@ -73,7 +76,8 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
return err
}
defer client.Close()
r.logger.Infof("Applying incremental snapshots...")

r.logger.Infof("Applying delta snapshots...")
return r.applyDeltaSnapshots(client, ro.DeltaSnapList)
}

Expand All @@ -84,7 +88,7 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error {
r.logger.Warnf("Base snapshot path not provided. Will do nothing.")
return nil
}

r.logger.Infof("Restoring from base snapshot: %s", path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName))
cfg := etcdserver.ServerConfig{
InitialClusterToken: ro.ClusterToken,
InitialPeerURLsMap: ro.ClusterURLs,
Expand Down Expand Up @@ -311,16 +315,52 @@ func startEmbeddedEtcd(ro RestoreOptions) (*embed.Etcd, error) {

// applyDeltaSnapshot applies thw events from time sorted list of delta snapshot to etcd sequentially
func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, snapList snapstore.SnapList) error {
for _, snap := range snapList {
firstDeltaSnap := snapList[0]
if err := r.applyFirstDeltaSnapshot(client, *firstDeltaSnap); err != nil {
return err
}
for _, snap := range snapList[1:] {
if err := r.applyDeltaSnapshot(client, *snap); err != nil {
return err
}
}
return nil
}

// applyFirstDeltaSnapshot applies thw events from first delta snapshot to etcd
func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
r.logger.Infof("Applying first delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
events, err := getEventsFromDeltaSnapshot(r.store, snap)
if err != nil {
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
}

// Note: Since revision in full snapshot file name might be lower than actual revision stored in snapshot.
// This is because of issue refereed below. So, as per workaround used in our logic of taking delta snapshot,
// latest revision from full snapshot may overlap with first few revision on first delta snapshot
// Hence, we have to additionally take care of that.
// Refer: https://github.com/coreos/etcd/issues/9037
ctx := context.TODO()
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
return fmt.Errorf("failed to get etcd latest revision: %v", err)
}
lastRevision := resp.Header.Revision

var newRevisionIndex int
for index, event := range events {
if event.EtcdEvent.Kv.ModRevision > lastRevision {
newRevisionIndex = index
break
}
}

return applyEventsToEtcd(client, events[newRevisionIndex:])
}

// applyDeltaSnapshot applies thw events from delta snapshot to etcd
func (r *Restorer) applyDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
r.logger.Infof("Applying delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
events, err := getEventsFromDeltaSnapshot(r.store, snap)
if err != nil {
return fmt.Errorf("failed to read events from delta snapshot %s : %v", snap.SnapName, err)
Expand Down Expand Up @@ -355,6 +395,7 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
ops = []clientv3.Op{}
ctx = context.TODO()
)

for _, e := range events {
ev := e.EtcdEvent
nextRev := ev.Kv.ModRevision
Expand Down
58 changes: 58 additions & 0 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package snapshotter

import (
"path"
"time"

"github.com/gardener/etcd-backup-restore/pkg/snapstore"
)

// GarbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) GarbageCollector(stopCh <-chan bool) {
for {
select {
case <-stopCh:
return
case <-time.After(ssr.garbageCollectionPeriodSeconds * time.Second):

ssr.logger.Infoln("GC: Executing garbage collection...")
snapList, err := ssr.store.List()
if err != nil {
ssr.logger.Warnf("GC: Failed to list snapshots: %v", err)
continue
}

snapLen := len(snapList)
var snapStreamIndexList []int
snapStreamIndexList = append(snapStreamIndexList, 0)
for index := 1; index < snapLen; index++ {
if snapList[index].Kind == snapstore.SnapshotKindFull {
snapStreamIndexList = append(snapStreamIndexList, index)
}
}

for snapStreamIndex := 0; snapStreamIndex < len(snapStreamIndexList)-ssr.maxBackups; snapStreamIndex++ {
for i := snapStreamIndexList[snapStreamIndex+1] - 1; i >= snapStreamIndex; i-- {
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
if err := ssr.store.Delete(*snapList[i]); err != nil {
ssr.logger.Warnf("Failed to delete snapshot %s: %v", path.Join(snapList[i].SnapDir, snapList[i].SnapName), err)
}
}
}
}
}
}
48 changes: 17 additions & 31 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
)

// NewSnapshotter returns the snapshotter object.
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
func NewSnapshotter(schedule string, store snapstore.SnapStore, logger *logrus.Logger, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, tlsConfig *TLSConfig) (*Snapshotter, error) {
logger.Printf("Validating schedule...")
sdl, err := cron.ParseStandard(schedule)
if err != nil {
return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err)
}
if maxBackups < 1 {
return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %s", maxBackups)
}

return &Snapshotter{
logger: logger,
schedule: sdl,
store: store,
maxBackups: maxBackups,
etcdConnectionTimeout: etcdConnectionTimeout,
tlsConfig: tlsConfig,
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
//currentDeltaSnapshotCount: 0,
logger: logger,
schedule: sdl,
store: store,
maxBackups: maxBackups,
etcdConnectionTimeout: etcdConnectionTimeout,
garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds,
tlsConfig: tlsConfig,
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
}, nil
}

Expand Down Expand Up @@ -135,9 +138,6 @@ func (ssr *Snapshotter) Run(stopCh <-chan struct{}) error {

now = time.Now()
effective = ssr.schedule.Next(now)

// TODO: move this garbage collector to paraller thread
ssr.garbageCollector()
if effective.IsZero() {
ssr.logger.Infoln("There are no backup scheduled for future. Stopping now.")
return nil
Expand Down Expand Up @@ -166,6 +166,9 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {

ctx, cancel := context.WithTimeout(context.TODO(), ssr.etcdConnectionTimeout*time.Second)
defer cancel()
// Note: Although Get and snapshot call are not atomic, so revision number in snapshot file
// may be ahead of the revision found from GET call. But currently this is the only workaround available
// Refer: https://github.com/coreos/etcd/issues/9037
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
return &errors.EtcdError{
Expand Down Expand Up @@ -199,23 +202,6 @@ func (ssr *Snapshotter) TakeFullSnapshot() error {
return nil
}

// garbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) garbageCollector() {
ssr.logger.Infoln("Executing garbage collection...")
snapList, err := ssr.store.List()
if err != nil {
ssr.logger.Warnf("Failed to list snapshots: %v", err)
return
}
snapLen := len(snapList)
for i := 0; i < (snapLen - ssr.maxBackups); i++ {
ssr.logger.Infof("Deleting old snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
if err := ssr.store.Delete(*snapList[i]); err != nil {
ssr.logger.Warnf("Failed to delete snapshot: %s", path.Join(snapList[i].SnapDir, snapList[i].SnapName))
}
}
}

// GetTLSClientForEtcd creates an etcd client using the TLS config params.
func GetTLSClientForEtcd(tlsConfig *TLSConfig) (*clientv3.Client, error) {
// set tls if any one tls option set
Expand Down Expand Up @@ -272,13 +258,14 @@ func (ssr *Snapshotter) applyWatch(wg *sync.WaitGroup, fullSnapshotCh chan<- tim
Message: fmt.Sprintf("failed to create etcd client: %v", err),
}
}
go ssr.processWatch(wg, client, fullSnapshotCh, stopCh)
wg.Add(1)
go ssr.processWatch(wg, client, fullSnapshotCh, stopCh)
return nil
}

// processWatch processess watch to take delta snapshot periodically by collecting set of events within period
func (ssr *Snapshotter) processWatch(wg *sync.WaitGroup, client *clientv3.Client, fullSnapshotCh chan<- time.Time, stopCh <-chan bool) {
defer wg.Done()
ctx := context.TODO()
watchCh := client.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(ssr.prevSnapshot.LastRevision+1))
ssr.logger.Infof("Applied watch on etcd from revision: %8d", ssr.prevSnapshot.LastRevision+1)
Expand All @@ -292,7 +279,6 @@ func (ssr *Snapshotter) processWatch(wg *sync.WaitGroup, client *clientv3.Client
select {
case <-stopCh:
ssr.logger.Infoln("Received stop signal. Terminating current watch...")
wg.Done()
return

case wr, ok := <-watchCh:
Expand Down
Loading

0 comments on commit 6129c45

Please sign in to comment.