From 5eb5d07b7ab2fc03351d9a0c15b6f70ee4bbc135 Mon Sep 17 00:00:00 2001 From: Gennady Laventman Date: Mon, 27 Mar 2017 10:37:19 +0300 Subject: [PATCH] [FAB-2780] Expire leadership, stateInfo and data Taking care of leadership messages store, state info cache and blocks message store in channel Change-Id: I9122fd7569ad156b248b269d8ab4016b7e22d176 Signed-off-by: Gennady Laventman --- gossip/election/election.go | 4 + gossip/gossip/channel/channel.go | 55 +++++++-- gossip/gossip/channel/channel_test.go | 154 ++++++++++++++++++++++++-- gossip/gossip/chanstate.go | 14 ++- gossip/gossip/gossip_impl.go | 5 +- 5 files changed, 206 insertions(+), 26 deletions(-) diff --git a/gossip/election/election.go b/gossip/election/election.go index 2fcb7eefdf5..7805f7f5d81 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -418,3 +418,7 @@ func getLeadershipDeclarationInterval() time.Duration { func getLeaderElectionDuration() time.Duration { return util.GetDurationOrDefault("peer.gossip.election.leaderElectionDuration", time.Second*5) } + +func GetMsgExpirationTimeout() time.Duration { + return getLeaderAliveThreshold() * 10 +} diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index cd37ae91829..5b0e67e7559 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/election" "github.com/hyperledger/fabric/gossip/filter" "github.com/hyperledger/fabric/gossip/gossip/msgstore" "github.com/hyperledger/fabric/gossip/gossip/pull" @@ -45,6 +46,9 @@ type Config struct { PullPeerNum int PullInterval time.Duration RequestStateInfoInterval time.Duration + + BlockExpirationInterval time.Duration + StateInfoExpirationInterval time.Duration } // GossipChannel defines an object that deals with all channel-related messages @@ -166,13 +170,22 @@ func NewGossipChannel(pkiID common.PKIidType, mcs api.MessageCryptoService, chai gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc} comparator := proto.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore) - gc.blockMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) { + + gc.blocksPuller = gc.createBlockPuller() + + gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) { + gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage)) + }, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) { gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage)) }) - gc.stateInfoMsgStore = newStateInfoCache() - gc.blocksPuller = gc.createBlockPuller() - gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) + gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoExpirationInterval) + + ttl := election.GetMsgExpirationTimeout() + noopFunc := func(m interface{}) {} + pol := proto.NewGossipMessageComparator(0) + + gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, noopFunc, ttl, nil, nil, nil) gc.ConfigureChannel(joinMsg) @@ -189,6 +202,9 @@ func (gc *gossipChannel) Stop() { gc.blocksPuller.Stop() gc.stateInfoPublishScheduler.Stop() gc.stateInfoRequestScheduler.Stop() + gc.leaderMsgStore.Stop() + gc.stateInfoMsgStore.Stop() + gc.blockMsgStore.Stop() } func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) { @@ -586,16 +602,31 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) { atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) } -// NewStateInfoMessageStore returns a MessageStore -func NewStateInfoMessageStore() msgstore.MessageStore { - return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) +// NewStateInfoMessageStore returns a expirable MessageStore +// ttl is time duration before msg expires and removed from store +func NewStateInfoMessageStore(ttl time.Duration) msgstore.MessageStore { + return NewStateInfoMessageStoreWithCallback(ttl, nil) } -func newStateInfoCache() *stateInfoCache { - return &stateInfoCache{ - MembershipStore: util.NewMembershipStore(), - MessageStore: NewStateInfoMessageStore(), +// NewStateInfoMessageStoreWithCallback returns a exiprable MessageStore +// Callback invoked once message expires and removed from store +// ttl is time duration before msg expires +func NewStateInfoMessageStoreWithCallback(ttl time.Duration, callback func(m interface{})) msgstore.MessageStore { + pol := proto.NewGossipMessageComparator(0) + noopTrigger := func(m interface{}) {} + return msgstore.NewMessageStoreExpirable(pol, noopTrigger, ttl, nil, nil, callback) +} + +func newStateInfoCache(ttl time.Duration) *stateInfoCache { + membershipStore := util.NewMembershipStore() + callback := func(m interface{}) { + membershipStore.Remove(m.(*proto.SignedGossipMessage).GetStateInfo().PkiId) + } + s := &stateInfoCache{ + MembershipStore: membershipStore, + MessageStore: NewStateInfoMessageStoreWithCallback(ttl, callback), } + return s } // stateInfoCache is actually a messageStore @@ -611,8 +642,8 @@ type stateInfoCache struct { // Message must be a StateInfo message. func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool { added := cache.MessageStore.Add(msg) - pkiID := msg.GetStateInfo().PkiId if added { + pkiID := msg.GetStateInfo().PkiId cache.MembershipStore.Put(pkiID, msg) } return added diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index b9abd4208cf..b876b042636 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -40,11 +40,13 @@ type msgMutator func(message *proto.Envelope) var conf = Config{ ID: "test", - PublishStateInfoInterval: time.Millisecond * 100, - MaxBlockCountToStore: 100, - PullPeerNum: 3, - PullInterval: time.Second, - RequestStateInfoInterval: time.Millisecond * 100, + PublishStateInfoInterval: time.Millisecond * 100, + MaxBlockCountToStore: 100, + PullPeerNum: 3, + PullInterval: time.Second, + RequestStateInfoInterval: time.Millisecond * 100, + BlockExpirationInterval: time.Second * 6, + StateInfoExpirationInterval: time.Second * 6, } func init() { @@ -275,7 +277,8 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) { }) gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{}) - gc.UpdateStateInfo(createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA)) + stateInfoMsg := createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA) + gc.UpdateStateInfo(stateInfoMsg) var msg *proto.SignedGossipMessage select { @@ -289,6 +292,14 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) { height, err := strconv.ParseInt(string(md), 10, 64) assert.NoError(t, err, "ReceivedMetadata is invalid") assert.Equal(t, ledgerHeight, int(height), "Received different ledger height than expected") + + // We will not update StateInfo in store, so store will become empty + time.Sleep(conf.StateInfoExpirationInterval + time.Second) + //Store is empty + assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MessageStore.Size(), "StateInfo MessageStore should be empty") + assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MembershipStore.Size(), "StateInfo MembershipStore should be empty") + + gc.Stop() } func TestChannelPull(t *testing.T) { @@ -523,6 +534,99 @@ func TestChannelAddToMessageStore(t *testing.T) { assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1})) } +func TestChannelAddToMessageStoreExpire(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + demuxedMsgs := make(chan *proto.SignedGossipMessage, 1) + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{}) + adapter.On("Gossip", mock.Anything) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { + demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage) + }) + + respondedChan := make(chan *proto.GossipMessage, 1) + messageRelayer := func(arg mock.Arguments) { + msg := arg.Get(0).(*proto.GossipMessage) + respondedChan <- msg + } + + // We make sure that if we get a new message it is de-multiplexed, + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1}) + select { + case <-time.After(time.Second): + t.Fatal("Haven't detected a demultiplexing within a time period") + case <-demuxedMsgs: + } + + // Lets check digests and state info store + stateInfoMsg := createStateInfoMsg(10, pkiIDInOrg1, channelA) + gc.AddToMsgStore(stateInfoMsg) + helloMsg := createHelloMsg(pkiIDInOrg1) + helloMsg.On("Respond", mock.Anything).Run(messageRelayer) + gc.HandleMessage(helloMsg) + select { + case <-time.After(time.Second): + t.Fatal("Haven't responded to hello message within a time period") + case msg := <-respondedChan: + if msg.IsDigestMsg() { + assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect") + } else { + t.Fatal("Not correct pull msg type in responce - expect digest") + } + } + + time.Sleep(gc.(*gossipChannel).GetConf().BlockExpirationInterval + time.Second) + + // message expired in store, but still isn't demultiplexed when we + // receive that message again + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1}) + select { + case <-time.After(time.Second): + case <-demuxedMsgs: + t.Fatal("Demultiplexing detected, even though it wasn't supposed to happen") + } + + // Lets check digests and state info store - state info expired, its add will do nothing and digest should not be sent + gc.AddToMsgStore(stateInfoMsg) + gc.HandleMessage(helloMsg) + select { + case <-time.After(time.Second): + case <-respondedChan: + t.Fatal("No digest should be sent") + } + + time.Sleep(gc.(*gossipChannel).GetConf().BlockExpirationInterval + time.Second) + // message removed from store, so it will be demultiplexed when we + // receive that message again + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1}) + select { + case <-time.After(time.Second): + t.Fatal("Haven't detected a demultiplexing within a time period") + case <-demuxedMsgs: + } + + // Lets check digests and state info store - state info removed as well, so it will be added back and digest will be created + gc.AddToMsgStore(stateInfoMsg) + gc.HandleMessage(helloMsg) + select { + case <-time.After(time.Second): + t.Fatal("Haven't responded to hello message within a time period") + case msg := <-respondedChan: + if msg.IsDigestMsg() { + assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect") + } else { + t.Fatal("Not correct pull msg type in responce - expect digest") + } + } + + gc.Stop() +} + func TestChannelBadBlocks(t *testing.T) { t.Parallel() receivedMessages := make(chan *proto.SignedGossipMessage, 1) @@ -693,7 +797,8 @@ func TestChannelStateInfoSnapshot(t *testing.T) { gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}) // Ensure we process stateInfo snapshots that are OK - gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}) + stateInfoMsg := &receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))} + gc.HandleMessage(stateInfoMsg) assert.NotEmpty(t, gc.GetPeers()) assert.Equal(t, "4", string(gc.GetPeers()[0].Metadata)) @@ -757,6 +862,41 @@ func TestChannelStateInfoSnapshot(t *testing.T) { invalidStateInfoSnapshot = stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, common.PKIidType("unknown"), channelA)) gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot}) + // Lets expire msg in store + time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second) + + // Lets check is state info store can't add expired msg but appear as empty to outside world + gc.HandleMessage(stateInfoMsg) + assert.Empty(t, gc.GetPeers()) + // Lets see if snapshot now empty, after message in store expired + go gc.HandleMessage(snapshotReq) + select { + case <-time.After(time.Second): + t.Fatal("Haven't received a state info snapshot on time") + case msg := <-sentMessages: + elements := msg.GetStateSnapshot().Elements + assert.Len(t, elements, 0, "StateInfo snapshot should contain zero messages") + } + + // Lets make sure msg removed from store + time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second) + + // Lets check is state info store add just expired msg + gc.HandleMessage(stateInfoMsg) + assert.NotEmpty(t, gc.GetPeers()) + // Lets see if snapshot is not empty now, after message was added back to store + go gc.HandleMessage(snapshotReq) + select { + case <-time.After(time.Second): + t.Fatal("Haven't received a state info snapshot on time") + case msg := <-sentMessages: + elements := msg.GetStateSnapshot().Elements + assert.Len(t, elements, 1) + sMsg, err := elements[0].ToGossipMessage() + assert.NoError(t, err) + assert.Equal(t, []byte("4"), sMsg.GetStateInfo().Metadata) + } + } func TestChannelStop(t *testing.T) { diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go index 39ced27b512..f8c88fa8bfe 100644 --- a/gossip/gossip/chanstate.go +++ b/gossip/gossip/chanstate.go @@ -126,12 +126,14 @@ type gossipAdapterImpl struct { func (ga *gossipAdapterImpl) GetConf() channel.Config { return channel.Config{ - ID: ga.conf.ID, - MaxBlockCountToStore: ga.conf.MaxBlockCountToStore, - PublishStateInfoInterval: ga.conf.PublishStateInfoInterval, - PullInterval: ga.conf.PullInterval, - PullPeerNum: ga.conf.PullPeerNum, - RequestStateInfoInterval: ga.conf.RequestStateInfoInterval, + ID: ga.conf.ID, + MaxBlockCountToStore: ga.conf.MaxBlockCountToStore, + PublishStateInfoInterval: ga.conf.PublishStateInfoInterval, + PullInterval: ga.conf.PullInterval, + PullPeerNum: ga.conf.PullPeerNum, + RequestStateInfoInterval: ga.conf.RequestStateInfoInterval, + BlockExpirationInterval: ga.conf.PullInterval * 100, + StateInfoExpirationInterval: ga.conf.PublishStateInfoInterval * 100, } } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 48c25f714b2..f3cf40360d6 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -90,8 +90,10 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis return nil } + stateInfoExpirationInterval := conf.PublishStateInfoInterval * 100 + g := &gossipServiceImpl{ - stateInfoMsgStore: channel.NewStateInfoMessageStore(), + stateInfoMsgStore: channel.NewStateInfoMessageStore(stateInfoExpirationInterval), selfOrg: secAdvisor.OrgByPeerIdentity(selfIdentity), secAdvisor: secAdvisor, selfIdentity: selfIdentity, @@ -641,6 +643,7 @@ func (g *gossipServiceImpl) Stop() { g.toDieChan <- struct{}{} g.emitter.Stop() g.ChannelDeMultiplexer.Close() + g.stateInfoMsgStore.Stop() g.stopSignal.Wait() comWG.Wait() }