Skip to content

Commit

Permalink
Integrate new raft-rocksdb store
Browse files Browse the repository at this point in the history
Former-commit-id: ca8fdad
  • Loading branch information
aalda committed Mar 19, 2019
1 parent c6f9f95 commit 49364a4
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 47 deletions.
6 changes: 3 additions & 3 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ func BenchmarkAuth(b *testing.B) {
}

func newNodeBench(b *testing.B, id int) (*raftwal.RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)

os.MkdirAll(badgerPath, os.FileMode(0755))
rocks, closeF := storage_utils.OpenRocksDBStore(b, badgerPath)
os.MkdirAll(rocksdbPath, os.FileMode(0755))
rocks, closeF := storage_utils.OpenRocksDBStore(b, rocksdbPath)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
os.MkdirAll(raftPath, os.FileMode(0755))
Expand Down
10 changes: 5 additions & 5 deletions raftwal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestApply(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -39,7 +39,7 @@ func TestApply(t *testing.T) {
}

func TestSnapshot(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -63,7 +63,7 @@ func (f *fakeRC) Close() error {
}

func TestRestore(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -73,7 +73,7 @@ func TestRestore(t *testing.T) {
}

func TestAddAndRestoreSnapshot(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) {
snaps, _ := snap.List()
_, r, _ := snap.Open(snaps[0].ID)

store2, close2F := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.2.db")
store2, close2F := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.2.db")
defer close2F()

// New FSMStore
Expand Down
40 changes: 18 additions & 22 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/raftwal/commands"
"github.com/bbva/qed/raftwal/raftrocks"
"github.com/bbva/qed/storage"
raftbadger "github.com/bbva/raft-badger"
"github.com/hashicorp/raft"
)

Expand Down Expand Up @@ -75,10 +75,10 @@ type RaftBalloon struct {
}

store struct {
db storage.ManagedStore // Persistent database
log raft.LogStore // Persistent log store
badgerLog *raftbadger.BadgerStore // Underlying badger-backed persistent log store
stable *raftbadger.BadgerStore // Persistent k-v store
db storage.ManagedStore // Persistent database
log raft.LogStore // Persistent log store
rocksStore *raftrocks.RocksDBStore // Underlying rocksdb-backed persistent log store
//stable *raftrocks.RocksDBStore // Persistent k-v store
snapshots *raft.FileSnapshotStore // Persistent snapstop store
}

Expand All @@ -91,23 +91,23 @@ type RaftBalloon struct {

}

// New returns a new RaftBalloon.
// NewRaftBalloon returns a new RaftBalloon.
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) {

// Create the log store and stable store
badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/logs")
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err)
}
logStore, err := raft.NewLogCache(raftLogCacheSize, badgerLogStore)
logStore, err := raft.NewLogCache(raftLogCacheSize, rocksStore)
if err != nil {
return nil, fmt.Errorf("new cached store: %s", err)
return nil, fmt.Errorf("cannot create a new cached store: %s", err)
}

stableStore, err := raftbadger.New(raftbadger.Options{Path: path + "/config", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/config")
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
}
// stableStore, err := raftrocks.New(raftrocks.Options{Path: path + "/config", NoSync: true})
// if err != nil {
// return nil, fmt.Errorf("cannot create a new rocksdb stable store: %s", err)
// }

// Instantiate balloon FSM
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue)
Expand All @@ -125,8 +125,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue

rb.store.db = store
rb.store.log = logStore
rb.store.stable = stableStore
rb.store.badgerLog = badgerLogStore
rb.store.rocksStore = rocksStore

return rb, nil
}
Expand Down Expand Up @@ -168,7 +167,7 @@ func (b *RaftBalloon) Open(bootstrap bool, metadata map[string]string) error {
}

// Instantiate the Raft system
b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.stable, b.store.snapshots, b.raft.transport)
b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.rocksStore, b.store.snapshots, b.raft.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
Expand Down Expand Up @@ -226,15 +225,12 @@ func (b *RaftBalloon) Close(wait bool) error {
}

// close raft store
if err := b.store.badgerLog.Close(); err != nil {
return err
}
if err := b.store.stable.Close(); err != nil {
if err := b.store.rocksStore.Close(); err != nil {
return err
}

b.store.rocksStore = nil
b.store.log = nil
b.store.stable = nil

// Close FSM
b.fsm.Close()
Expand Down
19 changes: 9 additions & 10 deletions raftwal/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/bbva/qed/protocol"

"github.com/bbva/qed/log"
"github.com/bbva/qed/storage/badger"
"github.com/bbva/qed/storage/rocks"
utilrand "github.com/bbva/qed/testutils/rand"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
Expand All @@ -46,21 +46,20 @@ func raftAddr(id int) string {
}

func newNode(t *testing.T, id int) (*RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
dbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id)

err := os.MkdirAll(badgerPath, os.FileMode(0755))
err := os.MkdirAll(dbPath, os.FileMode(0755))
require.NoError(t, err)
badger, err := badger.NewBadgerStore(badgerPath)
db, err := rocks.NewRocksDBStore(dbPath)
require.NoError(t, err)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(t, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 25000))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), db, make(chan *protocol.Snapshot, 25000))
require.NoError(t, err)

return r, func() {
fmt.Println("Removing node folder")
os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id))
}
}
Expand Down Expand Up @@ -450,17 +449,17 @@ func mustTempDir() string {
}

func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)

err := os.MkdirAll(badgerPath, os.FileMode(0755))
err := os.MkdirAll(rocksdbPath, os.FileMode(0755))
require.NoError(b, err)
badger, err := badger.NewBadgerStore(badgerPath)
rocksdb, err := rocks.NewRocksDBStore(rocksdbPath)
require.NoError(b, err)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(b, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 100))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100))
require.NoError(b, err)

return r, func() {
Expand Down
9 changes: 3 additions & 6 deletions raftwal/raftrocks/rocksdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

var (
// ErrKeyNotFound is an error indicating a given key does not exist
ErrKeyNotFound = errors.New("key not found")
ErrKeyNotFound = errors.New("not found")
)

const (
Expand Down Expand Up @@ -66,10 +66,7 @@ type RocksDBStore struct {
type Options struct {
// Path is the directory path to the RocksDB instance to use.
Path string
// TODO decide if we should use a diferent directory for the Rocks WAl

// BadgerOptions contains any specific RocksDB options you might
// want to specify.
// TODO decide if we should use a diferent directory for the Rocks WAL

// NoSync causes the database to skip fsync calls after each
// write to the log. This is unsafe, so it should be used
Expand Down Expand Up @@ -308,7 +305,7 @@ func (s *RocksDBStore) SetUint64(key []byte, val uint64) error {
}

// GetUint64 is like Get, but handles uint64 values
func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) { // TODO use GEt instead getBytes
func (s *RocksDBStore) GetUint64(key []byte) (uint64, error) {
val, err := s.Get(key)
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewServer(conf *Config) (*Server, error) {
return nil, err
}

// Open badger store
// Open RocksDB store
store, err := rocks.NewRocksDBStore(conf.DBPath)
if err != nil {
return nil, err
Expand Down

0 comments on commit 49364a4

Please sign in to comment.