Skip to content

Commit

Permalink
Merge pull request BBVA#84 from aalda/raft_rocks
Browse files Browse the repository at this point in the history
Raft with Rocksdb store

Former-commit-id: b9cd8d2
  • Loading branch information
aalda authored Mar 19, 2019
2 parents bf81e2d + 76ce0bd commit b43ed0f
Show file tree
Hide file tree
Showing 20 changed files with 1,665 additions and 115 deletions.
6 changes: 3 additions & 3 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ func BenchmarkAuth(b *testing.B) {
}

func newNodeBench(b *testing.B, id int) (*raftwal.RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)

os.MkdirAll(badgerPath, os.FileMode(0755))
rocks, closeF := storage_utils.OpenRocksDBStore(b, badgerPath)
os.MkdirAll(rocksdbPath, os.FileMode(0755))
rocks, closeF := storage_utils.OpenRocksDBStore(b, rocksdbPath)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
os.MkdirAll(raftPath, os.FileMode(0755))
Expand Down
10 changes: 5 additions & 5 deletions raftwal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestApply(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -39,7 +39,7 @@ func TestApply(t *testing.T) {
}

func TestSnapshot(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -63,7 +63,7 @@ func (f *fakeRC) Close() error {
}

func TestRestore(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand All @@ -73,7 +73,7 @@ func TestRestore(t *testing.T) {
}

func TestAddAndRestoreSnapshot(t *testing.T) {
store, closeF := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.db")
store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) {
snaps, _ := snap.List()
_, r, _ := snap.Open(snaps[0].ID)

store2, close2F := storage_utils.OpenBadgerStore(t, "/var/tmp/balloon.test.2.db")
store2, close2F := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.2.db")
defer close2F()

// New FSMStore
Expand Down
40 changes: 18 additions & 22 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/raftwal/commands"
"github.com/bbva/qed/raftwal/raftrocks"
"github.com/bbva/qed/storage"
raftbadger "github.com/bbva/raft-badger"
"github.com/hashicorp/raft"
)

Expand Down Expand Up @@ -75,10 +75,10 @@ type RaftBalloon struct {
}

store struct {
db storage.ManagedStore // Persistent database
log raft.LogStore // Persistent log store
badgerLog *raftbadger.BadgerStore // Underlying badger-backed persistent log store
stable *raftbadger.BadgerStore // Persistent k-v store
db storage.ManagedStore // Persistent database
log raft.LogStore // Persistent log store
rocksStore *raftrocks.RocksDBStore // Underlying rocksdb-backed persistent log store
//stable *raftrocks.RocksDBStore // Persistent k-v store
snapshots *raft.FileSnapshotStore // Persistent snapstop store
}

Expand All @@ -91,23 +91,23 @@ type RaftBalloon struct {

}

// New returns a new RaftBalloon.
// NewRaftBalloon returns a new RaftBalloon.
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) {

// Create the log store and stable store
badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/logs")
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
return nil, fmt.Errorf("cannot create a new rocksdb log store: %s", err)
}
logStore, err := raft.NewLogCache(raftLogCacheSize, badgerLogStore)
logStore, err := raft.NewLogCache(raftLogCacheSize, rocksStore)
if err != nil {
return nil, fmt.Errorf("new cached store: %s", err)
return nil, fmt.Errorf("cannot create a new cached store: %s", err)
}

stableStore, err := raftbadger.New(raftbadger.Options{Path: path + "/config", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/config")
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
}
// stableStore, err := raftrocks.New(raftrocks.Options{Path: path + "/config", NoSync: true})
// if err != nil {
// return nil, fmt.Errorf("cannot create a new rocksdb stable store: %s", err)
// }

// Instantiate balloon FSM
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue)
Expand All @@ -125,8 +125,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue

rb.store.db = store
rb.store.log = logStore
rb.store.stable = stableStore
rb.store.badgerLog = badgerLogStore
rb.store.rocksStore = rocksStore

return rb, nil
}
Expand Down Expand Up @@ -168,7 +167,7 @@ func (b *RaftBalloon) Open(bootstrap bool, metadata map[string]string) error {
}

// Instantiate the Raft system
b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.stable, b.store.snapshots, b.raft.transport)
b.raft.api, err = raft.NewRaft(b.raft.config, b.fsm, b.store.log, b.store.rocksStore, b.store.snapshots, b.raft.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
Expand Down Expand Up @@ -226,15 +225,12 @@ func (b *RaftBalloon) Close(wait bool) error {
}

// close raft store
if err := b.store.badgerLog.Close(); err != nil {
return err
}
if err := b.store.stable.Close(); err != nil {
if err := b.store.rocksStore.Close(); err != nil {
return err
}

b.store.rocksStore = nil
b.store.log = nil
b.store.stable = nil

// Close FSM
b.fsm.Close()
Expand Down
19 changes: 9 additions & 10 deletions raftwal/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/bbva/qed/protocol"

"github.com/bbva/qed/log"
"github.com/bbva/qed/storage/badger"
"github.com/bbva/qed/storage/rocks"
utilrand "github.com/bbva/qed/testutils/rand"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
Expand All @@ -46,21 +46,20 @@ func raftAddr(id int) string {
}

func newNode(t *testing.T, id int) (*RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
dbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/db", id)

err := os.MkdirAll(badgerPath, os.FileMode(0755))
err := os.MkdirAll(dbPath, os.FileMode(0755))
require.NoError(t, err)
badger, err := badger.NewBadgerStore(badgerPath)
db, err := rocks.NewRocksDBStore(dbPath)
require.NoError(t, err)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(t, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 25000))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), db, make(chan *protocol.Snapshot, 25000))
require.NoError(t, err)

return r, func() {
fmt.Println("Removing node folder")
os.RemoveAll(fmt.Sprintf("/var/tmp/raft-test/node%d", id))
}
}
Expand Down Expand Up @@ -450,17 +449,17 @@ func mustTempDir() string {
}

func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
badgerPath := fmt.Sprintf("/var/tmp/raft-test/node%d/badger", id)
rocksdbPath := fmt.Sprintf("/var/tmp/raft-test/node%d/rocksdb", id)

err := os.MkdirAll(badgerPath, os.FileMode(0755))
err := os.MkdirAll(rocksdbPath, os.FileMode(0755))
require.NoError(b, err)
badger, err := badger.NewBadgerStore(badgerPath)
rocksdb, err := rocks.NewRocksDBStore(rocksdbPath)
require.NoError(b, err)

raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(b, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), badger, make(chan *protocol.Snapshot, 100))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100))
require.NoError(b, err)

return r, func() {
Expand Down
104 changes: 104 additions & 0 deletions raftwal/raftrocks/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package raftrocks

import (
"os"
"testing"

"github.com/hashicorp/raft/bench"
)

func BenchmarkRocksDBStore_FirstIndex(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.FirstIndex(b, store)
}

func BenchmarkRocksDBStore_LastIndex(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.LastIndex(b, store)
}

func BenchmarkRocksDBStore_GetLog(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.GetLog(b, store)
}

func BenchmarkRocksDBStore_StoreLog(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.StoreLog(b, store)
}

func BenchmarkRocksDBStore_StoreLogs(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.StoreLogs(b, store)
}

func BenchmarkRocksDBStore_DeleteRange(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.DeleteRange(b, store)
}

func BenchmarkRocksDBStore_Set(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.Set(b, store)
}

func BenchmarkRocksDBStore_Get(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.Get(b, store)
}

func BenchmarkRocksDBStore_SetUint64(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.SetUint64(b, store)
}

func BenchmarkRocksDBStore_GetUint64(b *testing.B) {
store, path := testRocksDBStore(b)
defer store.Close()
defer os.Remove(path)

raftbench.GetUint64(b, store)
}
Loading

0 comments on commit b43ed0f

Please sign in to comment.