Skip to content

Commit

Permalink
Implement snapshot storage api in rocksdb and badger, and update raft…
Browse files Browse the repository at this point in the history
…wall to use it for backup and restore
  • Loading branch information
gdiazlo committed Mar 14, 2019
1 parent c1dd7fe commit 7fa84a2
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 26 deletions.
8 changes: 4 additions & 4 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,20 @@ func (fsm *BalloonFSM) Snapshot() (raft.FSMSnapshot, error) {
fsm.restoreMu.Lock()
defer fsm.restoreMu.Unlock()

version, err := fsm.store.GetLastVersion()
id, err := fsm.store.Snapshot()
if err != nil {
return nil, err
}
log.Debugf("Generating snapshot until version: %d (balloon version %d)", version, fsm.balloon.Version())
log.Debugf("Generating snapshot until version: %d (balloon version %d)", id, fsm.balloon.Version())

// Copy the node metadata.
meta, err := json.Marshal(fsm.meta)
if err != nil {
log.Debugf("failed to encode meta for snapshot: %s", err.Error())
return nil, err
}

return &fsmSnapshot{lastVersion: version, store: fsm.store, meta: meta}, nil
// change lastVersion by checkpoint structure
return &fsmSnapshot{id: id, store: fsm.store, meta: meta}, nil
}

// Restore restores the node to a previous state.
Expand Down
8 changes: 4 additions & 4 deletions raftwal/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
)

type fsmSnapshot struct {
lastVersion uint64
store storage.ManagedStore
meta []byte
id uint64
store storage.ManagedStore
meta []byte
}

// Persist writes the snapshot to the given sink.
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
log.Debug("Persisting snapshot...")
err := func() error {
if err := f.store.Backup(sink, f.lastVersion); err != nil {
if err := f.store.Backup(sink, f.id); err != nil {
return err
}
return sink.Close()
Expand Down
6 changes: 5 additions & 1 deletion storage/badger/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,11 @@ func (s *BadgerStore) Load(r io.Reader) error {
return s.db.Load(r)
}

func (s *BadgerStore) GetLastVersion() (uint64, error) {
// Take a snapshot of the store, and returns and id
// to be used in the back up process. The state of the
// snapshot is stored in the store instance.
// In badger the id corresponds to the last version stored.
func (s *BadgerStore) Snapshot() (uint64, error) {
var version uint64
err := s.db.View(func(txn *b.Txn) error {
opts := b.DefaultIteratorOptions
Expand Down
4 changes: 2 additions & 2 deletions storage/badger/badger_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestBackupLoad(t *testing.T) {
}
}

version, err := store.GetLastVersion()
version, err := store.Snapshot()
require.NoError(t, err)

backupDir := mustTempDir()
Expand All @@ -251,7 +251,7 @@ func TestBackupLoad(t *testing.T) {
restore, recloseF := openBadgerStore(t)
defer recloseF()
restore.Load(backupFile)
reversion, err := store.GetLastVersion()
reversion, err := store.Snapshot()

require.NoError(t, err)
require.Equal(t, reversion, version, "Error in restored version")
Expand Down
57 changes: 44 additions & 13 deletions storage/rocks/rocksdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/bbva/qed/rocksdb"
Expand All @@ -30,6 +30,15 @@ import (

type RocksDBStore struct {
db *rocksdb.DB

// checkpoints are stored in a path on the same
// folder as the database, so rocksdb uses hardlinks instead
// of copies
checkPointPath string

// each checkpoint is created in a subdirectory
// inside checkPointPath folder
checkpoints map[uint64]string
}

type rocksdbOpts struct {
Expand All @@ -55,8 +64,17 @@ func NewRocksDBStoreOpts(opts *rocksdbOpts) (*RocksDBStore, error) {
if err != nil {
return nil, err
}
checkPointPath := opts.Path + "/checkpoints"
err = os.MkdirAll(checkPointPath, 0755)
if err != nil {
return nil, err
}

store := &RocksDBStore{db: db}
store := &RocksDBStore{
db: db,
checkPointPath: checkPointPath,
checkpoints: make(map[uint64]string),
}
return store, nil
}

Expand Down Expand Up @@ -172,26 +190,32 @@ func (s RocksDBStore) Delete(prefix byte, key []byte) error {
return s.db.Delete(rocksdb.NewDefaultWriteOptions(), k)
}

// Backup dumps a protobuf-encoded list of all entries in the database into the
// given writer, that are newer than the specified version.
func (s *RocksDBStore) Backup(w io.Writer, until uint64) error {

// Take a snapshot of the store, and returns and id
// to be used in the back up process. The state of the
// snapshot is stored in the store instance.
func (s *RocksDBStore) Snapshot() (uint64, error) {
// create temp directory
checkDir, err := ioutil.TempDir("", "rocksdb-checkpoint")
if err != nil {
return err
}
id := uint64(len(s.checkpoints) + 1)
checkDir := fmt.Sprintf("%s/rocksdb-checkpoint-%d", s.checkPointPath, id)
os.RemoveAll(checkDir)

// create checkpoint
checkpoint, err := s.db.NewCheckpoint()
if err != nil {
return err
return 0, err
}
defer checkpoint.Destroy()

checkpoint.CreateCheckpoint(checkDir, 0)
defer os.RemoveAll(checkDir)

s.checkpoints[id] = checkDir
return id, nil
}

// Backup dumps a protobuf-encoded list of all entries in the database into the
// given writer, that are newer than the specified version.
func (s *RocksDBStore) Backup(w io.Writer, id uint64) error {

checkDir := s.checkpoints[id]

// open db for read-only
opts := rocksdb.NewDefaultOptions()
Expand Down Expand Up @@ -223,6 +247,13 @@ func (s *RocksDBStore) Backup(w io.Writer, until uint64) error {
}
}

// remove checkpoint from list
// order must be maintained,
delete(s.checkpoints, id)

// clean up only after we succesfully backup
os.RemoveAll(checkDir)

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion storage/rocks/rocksdb_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func TestBackupLoad(t *testing.T) {

// create backup
ioBuf := bytes.NewBufferString("")
require.NoError(t, store.Backup(ioBuf, 0))
id, err := store.Snapshot()
require.Nil(t, err)
require.NoError(t, store.Backup(ioBuf, id))

// restore backup
restore, recloseF := openRocksDBStore(t)
Expand Down
3 changes: 2 additions & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ type DeletableStore interface {
Store
Delete(prefix byte, key []byte) error
}

type ManagedStore interface {
Store
Backup(w io.Writer, until uint64) error
Snapshot() (uint64, error)
Load(r io.Reader) error
GetLastVersion() (uint64, error)
}

type Mutation struct {
Expand Down

0 comments on commit 7fa84a2

Please sign in to comment.