Skip to content

Commit

Permalink
Move agents queue from FSM to RaftBalloon
Browse files Browse the repository at this point in the history
closes #94
  • Loading branch information
aalda committed Apr 2, 2019
1 parent f8a8bae commit 33035b4
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 40 deletions.
25 changes: 6 additions & 19 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"io"
"sync"

"github.com/bbva/qed/protocol"

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
Expand All @@ -51,8 +49,6 @@ type BalloonFSM struct {
balloon *balloon.Balloon
state *fsmState

agentsQueue chan *protocol.Snapshot

metaMu sync.RWMutex
meta map[string]map[string]string

Expand All @@ -74,7 +70,7 @@ func loadState(s storage.ManagedStore) (*fsmState, error) {
return &state, err
}

func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, agentsQueue chan *protocol.Snapshot) (*BalloonFSM, error) {
func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*BalloonFSM, error) {

b, err := balloon.NewBalloon(store, hasherF)
if err != nil {
Expand All @@ -87,12 +83,11 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, ag
}

return &BalloonFSM{
hasherF: hasherF,
store: store,
balloon: b,
state: state,
agentsQueue: agentsQueue,
meta: make(map[string]map[string]string),
hasherF: hasherF,
store: store,
balloon: b,
state: state,
meta: make(map[string]map[string]string),
}, nil
}

Expand Down Expand Up @@ -262,14 +257,6 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse {
}
fsm.state = state

//Send snapshot to gossip agents
fsm.agentsQueue <- &protocol.Snapshot{
HistoryDigest: snapshot.HistoryDigest,
HyperDigest: snapshot.HyperDigest,
Version: snapshot.Version,
EventDigest: snapshot.EventDigest,
}

return &fsmAddResponse{snapshot: snapshot}
}

Expand Down
24 changes: 18 additions & 6 deletions raftwal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ import (
assert "github.com/stretchr/testify/require"

"github.com/bbva/qed/hashing"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/log"
"github.com/bbva/qed/raftwal/commands"
storage_utils "github.com/bbva/qed/testutils/storage"
)

func TestApply(t *testing.T) {

log.SetLogger("TestApply", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

// happy path
Expand All @@ -39,10 +42,13 @@ func TestApply(t *testing.T) {
}

func TestSnapshot(t *testing.T) {

log.SetLogger("TestSnapshot", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

fsm.Apply(newRaftLog(0, 0))
Expand All @@ -63,20 +69,26 @@ func (f *fakeRC) Close() error {
}

func TestRestore(t *testing.T) {

log.SetLogger("TestRestore", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

assert.NoError(t, fsm.Restore(&fakeRC{}))
}

func TestAddAndRestoreSnapshot(t *testing.T) {

log.SetLogger("TestAddAndRestoreSnapshot", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

fsm.Apply(newRaftLog(0, 0))
Expand Down Expand Up @@ -108,7 +120,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) {
defer close2F()

// New FSMStore
fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher)
assert.NoError(t, err)

err = fsm2.Restore(r)
Expand Down
30 changes: 21 additions & 9 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ type RaftBalloon struct {
wg sync.WaitGroup
done chan struct{}

fsm *BalloonFSM // balloon's finite state machine
fsm *BalloonFSM // balloon's finite state machine
snapshotsCh chan *protocol.Snapshot // channel to publish snapshots

}

// NewRaftBalloon returns a new RaftBalloon.
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) {
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) {

// Create the log store and stable store
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
Expand All @@ -110,17 +111,18 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue
// }

// Instantiate balloon FSM
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue)
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
if err != nil {
return nil, fmt.Errorf("new balloon fsm: %s", err)
}

rb := &RaftBalloon{
path: path,
addr: addr,
id: id,
done: make(chan struct{}),
fsm: fsm,
path: path,
addr: addr,
id: id,
done: make(chan struct{}),
fsm: fsm,
snapshotsCh: snapshotsCh,
}

rb.store.db = store
Expand Down Expand Up @@ -367,7 +369,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
if err != nil {
return nil, err
}
return resp.(*fsmAddResponse).snapshot, nil
snapshot := resp.(*fsmAddResponse).snapshot

//Send snapshot to the snapshot channel
b.snapshotsCh <- &protocol.Snapshot{ // TODO move this to an upper layer (shard manager?)
HistoryDigest: snapshot.HistoryDigest,
HyperDigest: snapshot.HyperDigest,
Version: snapshot.Version,
EventDigest: snapshot.EventDigest,
}

return snapshot, nil
}

func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) {
Expand Down
2 changes: 1 addition & 1 deletion raftwal/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(b, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 10000))
require.NoError(b, err)

return r, func() {
Expand Down
10 changes: 5 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Server struct {
signer sign.Signer
sender *sender.Sender
agent *gossip.Agent
agentsQueue chan *protocol.Snapshot
snapshotsCh chan *protocol.Snapshot
}

func serverInfo(conf *Config) http.HandlerFunc {
Expand Down Expand Up @@ -146,14 +146,14 @@ func NewServer(conf *Config) (*Server, error) {
}

// TODO: add queue size to config
server.agentsQueue = make(chan *protocol.Snapshot, 2<<16)
server.snapshotsCh = make(chan *protocol.Snapshot, 2<<16)

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
server.sender.RegisterMetrics(server.metricsServer)

// Create RaftBalloon
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.agentsQueue)
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *Server) Start() error {

go func() {
log.Debug(" * Starting QED gossip agent.")
s.sender.Start(s.agentsQueue)
s.sender.Start(s.snapshotsCh)
}()

util.AwaitTermSignal(s.Stop)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *Server) Stop() error {

log.Debugf("Closing QED sender...")
s.sender.Stop()
close(s.agentsQueue)
close(s.snapshotsCh)

log.Debugf("Stopping QED agent...")
if err := s.agent.Shutdown(); err != nil {
Expand Down

0 comments on commit 33035b4

Please sign in to comment.