Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move agents queue from FSM to RaftBalloon #98

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"io"
"sync"

"github.com/bbva/qed/protocol"

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
Expand All @@ -51,8 +49,6 @@ type BalloonFSM struct {
balloon *balloon.Balloon
state *fsmState

agentsQueue chan *protocol.Snapshot

metaMu sync.RWMutex
meta map[string]map[string]string

Expand All @@ -74,7 +70,7 @@ func loadState(s storage.ManagedStore) (*fsmState, error) {
return &state, err
}

func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, agentsQueue chan *protocol.Snapshot) (*BalloonFSM, error) {
func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*BalloonFSM, error) {

b, err := balloon.NewBalloon(store, hasherF)
if err != nil {
Expand All @@ -87,12 +83,11 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, ag
}

return &BalloonFSM{
hasherF: hasherF,
store: store,
balloon: b,
state: state,
agentsQueue: agentsQueue,
meta: make(map[string]map[string]string),
hasherF: hasherF,
store: store,
balloon: b,
state: state,
meta: make(map[string]map[string]string),
}, nil
}

Expand Down Expand Up @@ -262,14 +257,6 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse {
}
fsm.state = state

//Send snapshot to gossip agents
fsm.agentsQueue <- &protocol.Snapshot{
HistoryDigest: snapshot.HistoryDigest,
HyperDigest: snapshot.HyperDigest,
Version: snapshot.Version,
EventDigest: snapshot.EventDigest,
}

return &fsmAddResponse{snapshot: snapshot}
}

Expand Down
24 changes: 18 additions & 6 deletions raftwal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ import (
assert "github.com/stretchr/testify/require"

"github.com/bbva/qed/hashing"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/log"
"github.com/bbva/qed/raftwal/commands"
storage_utils "github.com/bbva/qed/testutils/storage"
)

func TestApply(t *testing.T) {

log.SetLogger("TestApply", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

// happy path
Expand All @@ -39,10 +42,13 @@ func TestApply(t *testing.T) {
}

func TestSnapshot(t *testing.T) {

log.SetLogger("TestSnapshot", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

fsm.Apply(newRaftLog(0, 0))
Expand All @@ -63,20 +69,26 @@ func (f *fakeRC) Close() error {
}

func TestRestore(t *testing.T) {

log.SetLogger("TestRestore", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

assert.NoError(t, fsm.Restore(&fakeRC{}))
}

func TestAddAndRestoreSnapshot(t *testing.T) {

log.SetLogger("TestAddAndRestoreSnapshot", log.SILENT)

store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db")
defer closeF()

fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
assert.NoError(t, err)

fsm.Apply(newRaftLog(0, 0))
Expand Down Expand Up @@ -108,7 +120,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) {
defer close2F()

// New FSMStore
fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100))
fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher)
assert.NoError(t, err)

err = fsm2.Restore(r)
Expand Down
30 changes: 21 additions & 9 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ type RaftBalloon struct {
wg sync.WaitGroup
done chan struct{}

fsm *BalloonFSM // balloon's finite state machine
fsm *BalloonFSM // balloon's finite state machine
snapshotsCh chan *protocol.Snapshot // channel to publish snapshots

}

// NewRaftBalloon returns a new RaftBalloon.
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) {
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) {

// Create the log store and stable store
rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true})
Expand All @@ -110,17 +111,18 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue
// }

// Instantiate balloon FSM
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue)
fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher)
if err != nil {
return nil, fmt.Errorf("new balloon fsm: %s", err)
}

rb := &RaftBalloon{
path: path,
addr: addr,
id: id,
done: make(chan struct{}),
fsm: fsm,
path: path,
addr: addr,
id: id,
done: make(chan struct{}),
fsm: fsm,
snapshotsCh: snapshotsCh,
}

rb.store.db = store
Expand Down Expand Up @@ -367,7 +369,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) {
if err != nil {
return nil, err
}
return resp.(*fsmAddResponse).snapshot, nil
snapshot := resp.(*fsmAddResponse).snapshot

//Send snapshot to the snapshot channel
b.snapshotsCh <- &protocol.Snapshot{ // TODO move this to an upper layer (shard manager?)
HistoryDigest: snapshot.HistoryDigest,
HyperDigest: snapshot.HyperDigest,
Version: snapshot.Version,
EventDigest: snapshot.EventDigest,
}

return snapshot, nil
}

func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) {
Expand Down
2 changes: 1 addition & 1 deletion raftwal/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) {
raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id)
err = os.MkdirAll(raftPath, os.FileMode(0755))
require.NoError(b, err)
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100))
r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 10000))
require.NoError(b, err)

return r, func() {
Expand Down
10 changes: 5 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Server struct {
signer sign.Signer
sender *sender.Sender
agent *gossip.Agent
agentsQueue chan *protocol.Snapshot
snapshotsCh chan *protocol.Snapshot
}

func serverInfo(conf *Config) http.HandlerFunc {
Expand Down Expand Up @@ -146,14 +146,14 @@ func NewServer(conf *Config) (*Server, error) {
}

// TODO: add queue size to config
server.agentsQueue = make(chan *protocol.Snapshot, 2<<16)
server.snapshotsCh = make(chan *protocol.Snapshot, 2<<16)

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
server.sender.RegisterMetrics(server.metricsServer)

// Create RaftBalloon
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.agentsQueue)
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *Server) Start() error {

go func() {
log.Debug(" * Starting QED gossip agent.")
s.sender.Start(s.agentsQueue)
s.sender.Start(s.snapshotsCh)
}()

util.AwaitTermSignal(s.Stop)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *Server) Stop() error {

log.Debugf("Closing QED sender...")
s.sender.Stop()
close(s.agentsQueue)
close(s.snapshotsCh)

log.Debugf("Stopping QED agent...")
if err := s.agent.Shutdown(); err != nil {
Expand Down