diff --git a/raftwal/fsm.go b/raftwal/fsm.go index a68ad8e8b..2e6e5c549 100644 --- a/raftwal/fsm.go +++ b/raftwal/fsm.go @@ -24,8 +24,6 @@ import ( "io" "sync" - "github.com/bbva/qed/protocol" - "github.com/bbva/qed/balloon" "github.com/bbva/qed/hashing" "github.com/bbva/qed/log" @@ -51,8 +49,6 @@ type BalloonFSM struct { balloon *balloon.Balloon state *fsmState - agentsQueue chan *protocol.Snapshot - metaMu sync.RWMutex meta map[string]map[string]string @@ -74,7 +70,7 @@ func loadState(s storage.ManagedStore) (*fsmState, error) { return &state, err } -func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, agentsQueue chan *protocol.Snapshot) (*BalloonFSM, error) { +func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher) (*BalloonFSM, error) { b, err := balloon.NewBalloon(store, hasherF) if err != nil { @@ -87,12 +83,11 @@ func NewBalloonFSM(store storage.ManagedStore, hasherF func() hashing.Hasher, ag } return &BalloonFSM{ - hasherF: hasherF, - store: store, - balloon: b, - state: state, - agentsQueue: agentsQueue, - meta: make(map[string]map[string]string), + hasherF: hasherF, + store: store, + balloon: b, + state: state, + meta: make(map[string]map[string]string), }, nil } @@ -262,14 +257,6 @@ func (fsm *BalloonFSM) applyAdd(event []byte, state *fsmState) *fsmAddResponse { } fsm.state = state - //Send snapshot to gossip agents - fsm.agentsQueue <- &protocol.Snapshot{ - HistoryDigest: snapshot.HistoryDigest, - HyperDigest: snapshot.HyperDigest, - Version: snapshot.Version, - EventDigest: snapshot.EventDigest, - } - return &fsmAddResponse{snapshot: snapshot} } diff --git a/raftwal/fsm_test.go b/raftwal/fsm_test.go index b3e6407d5..c1835f09c 100644 --- a/raftwal/fsm_test.go +++ b/raftwal/fsm_test.go @@ -8,16 +8,19 @@ import ( assert "github.com/stretchr/testify/require" "github.com/bbva/qed/hashing" - "github.com/bbva/qed/protocol" + "github.com/bbva/qed/log" "github.com/bbva/qed/raftwal/commands" storage_utils "github.com/bbva/qed/testutils/storage" ) func TestApply(t *testing.T) { + + log.SetLogger("TestApply", log.SILENT) + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) assert.NoError(t, err) // happy path @@ -39,10 +42,13 @@ func TestApply(t *testing.T) { } func TestSnapshot(t *testing.T) { + + log.SetLogger("TestSnapshot", log.SILENT) + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) assert.NoError(t, err) fsm.Apply(newRaftLog(0, 0)) @@ -63,20 +69,26 @@ func (f *fakeRC) Close() error { } func TestRestore(t *testing.T) { + + log.SetLogger("TestRestore", log.SILENT) + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) assert.NoError(t, err) assert.NoError(t, fsm.Restore(&fakeRC{})) } func TestAddAndRestoreSnapshot(t *testing.T) { + + log.SetLogger("TestAddAndRestoreSnapshot", log.SILENT) + store, closeF := storage_utils.OpenRocksDBStore(t, "/var/tmp/balloon.test.db") defer closeF() - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) assert.NoError(t, err) fsm.Apply(newRaftLog(0, 0)) @@ -108,7 +120,7 @@ func TestAddAndRestoreSnapshot(t *testing.T) { defer close2F() // New FSMStore - fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher, make(chan *protocol.Snapshot, 100)) + fsm2, err := NewBalloonFSM(store2, hashing.NewSha256Hasher) assert.NoError(t, err) err = fsm2.Restore(r) diff --git a/raftwal/raft.go b/raftwal/raft.go index 37e8f0040..046bb8cd3 100644 --- a/raftwal/raft.go +++ b/raftwal/raft.go @@ -87,12 +87,13 @@ type RaftBalloon struct { wg sync.WaitGroup done chan struct{} - fsm *BalloonFSM // balloon's finite state machine + fsm *BalloonFSM // balloon's finite state machine + snapshotsCh chan *protocol.Snapshot // channel to publish snapshots } // NewRaftBalloon returns a new RaftBalloon. -func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQueue chan *protocol.Snapshot) (*RaftBalloon, error) { +func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, snapshotsCh chan *protocol.Snapshot) (*RaftBalloon, error) { // Create the log store and stable store rocksStore, err := raftrocks.New(raftrocks.Options{Path: path + "/wal", NoSync: true}) @@ -110,17 +111,18 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore, agentsQue // } // Instantiate balloon FSM - fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher, agentsQueue) + fsm, err := NewBalloonFSM(store, hashing.NewSha256Hasher) if err != nil { return nil, fmt.Errorf("new balloon fsm: %s", err) } rb := &RaftBalloon{ - path: path, - addr: addr, - id: id, - done: make(chan struct{}), - fsm: fsm, + path: path, + addr: addr, + id: id, + done: make(chan struct{}), + fsm: fsm, + snapshotsCh: snapshotsCh, } rb.store.db = store @@ -367,7 +369,17 @@ func (b *RaftBalloon) Add(event []byte) (*balloon.Snapshot, error) { if err != nil { return nil, err } - return resp.(*fsmAddResponse).snapshot, nil + snapshot := resp.(*fsmAddResponse).snapshot + + //Send snapshot to the snapshot channel + b.snapshotsCh <- &protocol.Snapshot{ // TODO move this to an upper layer (shard manager?) + HistoryDigest: snapshot.HistoryDigest, + HyperDigest: snapshot.HyperDigest, + Version: snapshot.Version, + EventDigest: snapshot.EventDigest, + } + + return snapshot, nil } func (b *RaftBalloon) QueryDigestMembership(keyDigest hashing.Digest, version uint64) (*balloon.MembershipProof, error) { diff --git a/raftwal/raft_test.go b/raftwal/raft_test.go index f3cd4b91e..eb64ef778 100644 --- a/raftwal/raft_test.go +++ b/raftwal/raft_test.go @@ -459,7 +459,7 @@ func newNodeBench(b *testing.B, id int) (*RaftBalloon, func()) { raftPath := fmt.Sprintf("/var/tmp/raft-test/node%d/raft", id) err = os.MkdirAll(raftPath, os.FileMode(0755)) require.NoError(b, err) - r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 100)) + r, err := NewRaftBalloon(raftPath, raftAddr(id), fmt.Sprintf("%d", id), rocksdb, make(chan *protocol.Snapshot, 10000)) require.NoError(b, err) return r, func() { diff --git a/server/server.go b/server/server.go index 0284e6ff0..95c071289 100644 --- a/server/server.go +++ b/server/server.go @@ -59,7 +59,7 @@ type Server struct { signer sign.Signer sender *sender.Sender agent *gossip.Agent - agentsQueue chan *protocol.Snapshot + snapshotsCh chan *protocol.Snapshot } func serverInfo(conf *Config) http.HandlerFunc { @@ -146,14 +146,14 @@ func NewServer(conf *Config) (*Server, error) { } // TODO: add queue size to config - server.agentsQueue = make(chan *protocol.Snapshot, 2<<16) + server.snapshotsCh = make(chan *protocol.Snapshot, 2<<16) // Create sender server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer) server.sender.RegisterMetrics(server.metricsServer) // Create RaftBalloon - server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.agentsQueue) + server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh) if err != nil { return nil, err } @@ -254,7 +254,7 @@ func (s *Server) Start() error { go func() { log.Debug(" * Starting QED gossip agent.") - s.sender.Start(s.agentsQueue) + s.sender.Start(s.snapshotsCh) }() util.AwaitTermSignal(s.Stop) @@ -293,7 +293,7 @@ func (s *Server) Stop() error { log.Debugf("Closing QED sender...") s.sender.Stop() - close(s.agentsQueue) + close(s.snapshotsCh) log.Debugf("Stopping QED agent...") if err := s.agent.Shutdown(); err != nil {