Skip to content

Commit

Permalink
[FAB-2780] Expire leadership, stateInfo and data
Browse files Browse the repository at this point in the history
Taking care of leadership messages store, state info cache
and blocks message store in channel

Change-Id: I9122fd7569ad156b248b269d8ab4016b7e22d176
Signed-off-by: Gennady Laventman <[email protected]>
  • Loading branch information
gennadylaventman committed Apr 19, 2017
1 parent 9d12166 commit 5eb5d07
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 26 deletions.
4 changes: 4 additions & 0 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,7 @@ func getLeadershipDeclarationInterval() time.Duration {
func getLeaderElectionDuration() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.leaderElectionDuration", time.Second*5)
}

func GetMsgExpirationTimeout() time.Duration {
return getLeaderAliveThreshold() * 10
}
55 changes: 43 additions & 12 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/filter"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/gossip/pull"
Expand All @@ -45,6 +46,9 @@ type Config struct {
PullPeerNum int
PullInterval time.Duration
RequestStateInfoInterval time.Duration

BlockExpirationInterval time.Duration
StateInfoExpirationInterval time.Duration
}

// GossipChannel defines an object that deals with all channel-related messages
Expand Down Expand Up @@ -166,13 +170,22 @@ func NewGossipChannel(pkiID common.PKIidType, mcs api.MessageCryptoService, chai
gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc}

comparator := proto.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore)
gc.blockMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) {

gc.blocksPuller = gc.createBlockPuller()

gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) {
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
}, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) {
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
})

gc.stateInfoMsgStore = newStateInfoCache()
gc.blocksPuller = gc.createBlockPuller()
gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoExpirationInterval)

ttl := election.GetMsgExpirationTimeout()
noopFunc := func(m interface{}) {}
pol := proto.NewGossipMessageComparator(0)

gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, noopFunc, ttl, nil, nil, nil)

gc.ConfigureChannel(joinMsg)

Expand All @@ -189,6 +202,9 @@ func (gc *gossipChannel) Stop() {
gc.blocksPuller.Stop()
gc.stateInfoPublishScheduler.Stop()
gc.stateInfoRequestScheduler.Stop()
gc.leaderMsgStore.Stop()
gc.stateInfoMsgStore.Stop()
gc.blockMsgStore.Stop()
}

func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
Expand Down Expand Up @@ -586,16 +602,31 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

// NewStateInfoMessageStore returns a MessageStore
func NewStateInfoMessageStore() msgstore.MessageStore {
return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
// NewStateInfoMessageStore returns a expirable MessageStore
// ttl is time duration before msg expires and removed from store
func NewStateInfoMessageStore(ttl time.Duration) msgstore.MessageStore {
return NewStateInfoMessageStoreWithCallback(ttl, nil)
}

func newStateInfoCache() *stateInfoCache {
return &stateInfoCache{
MembershipStore: util.NewMembershipStore(),
MessageStore: NewStateInfoMessageStore(),
// NewStateInfoMessageStoreWithCallback returns a exiprable MessageStore
// Callback invoked once message expires and removed from store
// ttl is time duration before msg expires
func NewStateInfoMessageStoreWithCallback(ttl time.Duration, callback func(m interface{})) msgstore.MessageStore {
pol := proto.NewGossipMessageComparator(0)
noopTrigger := func(m interface{}) {}
return msgstore.NewMessageStoreExpirable(pol, noopTrigger, ttl, nil, nil, callback)
}

func newStateInfoCache(ttl time.Duration) *stateInfoCache {
membershipStore := util.NewMembershipStore()
callback := func(m interface{}) {
membershipStore.Remove(m.(*proto.SignedGossipMessage).GetStateInfo().PkiId)
}
s := &stateInfoCache{
MembershipStore: membershipStore,
MessageStore: NewStateInfoMessageStoreWithCallback(ttl, callback),
}
return s
}

// stateInfoCache is actually a messageStore
Expand All @@ -611,8 +642,8 @@ type stateInfoCache struct {
// Message must be a StateInfo message.
func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
added := cache.MessageStore.Add(msg)
pkiID := msg.GetStateInfo().PkiId
if added {
pkiID := msg.GetStateInfo().PkiId
cache.MembershipStore.Put(pkiID, msg)
}
return added
Expand Down
154 changes: 147 additions & 7 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ type msgMutator func(message *proto.Envelope)

var conf = Config{
ID: "test",
PublishStateInfoInterval: time.Millisecond * 100,
MaxBlockCountToStore: 100,
PullPeerNum: 3,
PullInterval: time.Second,
RequestStateInfoInterval: time.Millisecond * 100,
PublishStateInfoInterval: time.Millisecond * 100,
MaxBlockCountToStore: 100,
PullPeerNum: 3,
PullInterval: time.Second,
RequestStateInfoInterval: time.Millisecond * 100,
BlockExpirationInterval: time.Second * 6,
StateInfoExpirationInterval: time.Second * 6,
}

func init() {
Expand Down Expand Up @@ -275,7 +277,8 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
})

gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
gc.UpdateStateInfo(createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA))
stateInfoMsg := createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA)
gc.UpdateStateInfo(stateInfoMsg)

var msg *proto.SignedGossipMessage
select {
Expand All @@ -289,6 +292,14 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
height, err := strconv.ParseInt(string(md), 10, 64)
assert.NoError(t, err, "ReceivedMetadata is invalid")
assert.Equal(t, ledgerHeight, int(height), "Received different ledger height than expected")

// We will not update StateInfo in store, so store will become empty
time.Sleep(conf.StateInfoExpirationInterval + time.Second)
//Store is empty
assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MessageStore.Size(), "StateInfo MessageStore should be empty")
assert.Equal(t, 0, gc.(*gossipChannel).stateInfoMsgStore.MembershipStore.Size(), "StateInfo MembershipStore should be empty")

gc.Stop()
}

func TestChannelPull(t *testing.T) {
Expand Down Expand Up @@ -523,6 +534,99 @@ func TestChannelAddToMessageStore(t *testing.T) {
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
}

func TestChannelAddToMessageStoreExpire(t *testing.T) {
t.Parallel()

cs := &cryptoService{}
cs.On("VerifyBlock", mock.Anything).Return(nil)
demuxedMsgs := make(chan *proto.SignedGossipMessage, 1)
adapter := new(gossipAdapterMock)
configureAdapter(adapter)
gc := NewGossipChannel(pkiIDInOrg1, cs, channelA, adapter, &joinChanMsg{})
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage)
})

respondedChan := make(chan *proto.GossipMessage, 1)
messageRelayer := func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.GossipMessage)
respondedChan <- msg
}

// We make sure that if we get a new message it is de-multiplexed,
gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1})
select {
case <-time.After(time.Second):
t.Fatal("Haven't detected a demultiplexing within a time period")
case <-demuxedMsgs:
}

// Lets check digests and state info store
stateInfoMsg := createStateInfoMsg(10, pkiIDInOrg1, channelA)
gc.AddToMsgStore(stateInfoMsg)
helloMsg := createHelloMsg(pkiIDInOrg1)
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
gc.HandleMessage(helloMsg)
select {
case <-time.After(time.Second):
t.Fatal("Haven't responded to hello message within a time period")
case msg := <-respondedChan:
if msg.IsDigestMsg() {
assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect")
} else {
t.Fatal("Not correct pull msg type in responce - expect digest")
}
}

time.Sleep(gc.(*gossipChannel).GetConf().BlockExpirationInterval + time.Second)

// message expired in store, but still isn't demultiplexed when we
// receive that message again
gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1})
select {
case <-time.After(time.Second):
case <-demuxedMsgs:
t.Fatal("Demultiplexing detected, even though it wasn't supposed to happen")
}

// Lets check digests and state info store - state info expired, its add will do nothing and digest should not be sent
gc.AddToMsgStore(stateInfoMsg)
gc.HandleMessage(helloMsg)
select {
case <-time.After(time.Second):
case <-respondedChan:
t.Fatal("No digest should be sent")
}

time.Sleep(gc.(*gossipChannel).GetConf().BlockExpirationInterval + time.Second)
// message removed from store, so it will be demultiplexed when we
// receive that message again
gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1})
select {
case <-time.After(time.Second):
t.Fatal("Haven't detected a demultiplexing within a time period")
case <-demuxedMsgs:
}

// Lets check digests and state info store - state info removed as well, so it will be added back and digest will be created
gc.AddToMsgStore(stateInfoMsg)
gc.HandleMessage(helloMsg)
select {
case <-time.After(time.Second):
t.Fatal("Haven't responded to hello message within a time period")
case msg := <-respondedChan:
if msg.IsDigestMsg() {
assert.Equal(t, 1, len(msg.GetDataDig().Digests), "Number of digests returned by channel blockPuller incorrect")
} else {
t.Fatal("Not correct pull msg type in responce - expect digest")
}
}

gc.Stop()
}

func TestChannelBadBlocks(t *testing.T) {
t.Parallel()
receivedMessages := make(chan *proto.SignedGossipMessage, 1)
Expand Down Expand Up @@ -693,7 +797,8 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))})

// Ensure we process stateInfo snapshots that are OK
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))})
stateInfoMsg := &receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}
gc.HandleMessage(stateInfoMsg)
assert.NotEmpty(t, gc.GetPeers())
assert.Equal(t, "4", string(gc.GetPeers()[0].Metadata))

Expand Down Expand Up @@ -757,6 +862,41 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
invalidStateInfoSnapshot = stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, common.PKIidType("unknown"), channelA))
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot})

// Lets expire msg in store
time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second)

// Lets check is state info store can't add expired msg but appear as empty to outside world
gc.HandleMessage(stateInfoMsg)
assert.Empty(t, gc.GetPeers())
// Lets see if snapshot now empty, after message in store expired
go gc.HandleMessage(snapshotReq)
select {
case <-time.After(time.Second):
t.Fatal("Haven't received a state info snapshot on time")
case msg := <-sentMessages:
elements := msg.GetStateSnapshot().Elements
assert.Len(t, elements, 0, "StateInfo snapshot should contain zero messages")
}

// Lets make sure msg removed from store
time.Sleep(gc.(*gossipChannel).GetConf().StateInfoExpirationInterval + time.Second)

// Lets check is state info store add just expired msg
gc.HandleMessage(stateInfoMsg)
assert.NotEmpty(t, gc.GetPeers())
// Lets see if snapshot is not empty now, after message was added back to store
go gc.HandleMessage(snapshotReq)
select {
case <-time.After(time.Second):
t.Fatal("Haven't received a state info snapshot on time")
case msg := <-sentMessages:
elements := msg.GetStateSnapshot().Elements
assert.Len(t, elements, 1)
sMsg, err := elements[0].ToGossipMessage()
assert.NoError(t, err)
assert.Equal(t, []byte("4"), sMsg.GetStateInfo().Metadata)
}

}

func TestChannelStop(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,14 @@ type gossipAdapterImpl struct {

func (ga *gossipAdapterImpl) GetConf() channel.Config {
return channel.Config{
ID: ga.conf.ID,
MaxBlockCountToStore: ga.conf.MaxBlockCountToStore,
PublishStateInfoInterval: ga.conf.PublishStateInfoInterval,
PullInterval: ga.conf.PullInterval,
PullPeerNum: ga.conf.PullPeerNum,
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval,
ID: ga.conf.ID,
MaxBlockCountToStore: ga.conf.MaxBlockCountToStore,
PublishStateInfoInterval: ga.conf.PublishStateInfoInterval,
PullInterval: ga.conf.PullInterval,
PullPeerNum: ga.conf.PullPeerNum,
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval,
BlockExpirationInterval: ga.conf.PullInterval * 100,
StateInfoExpirationInterval: ga.conf.PublishStateInfoInterval * 100,
}
}

Expand Down
5 changes: 4 additions & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
return nil
}

stateInfoExpirationInterval := conf.PublishStateInfoInterval * 100

g := &gossipServiceImpl{
stateInfoMsgStore: channel.NewStateInfoMessageStore(),
stateInfoMsgStore: channel.NewStateInfoMessageStore(stateInfoExpirationInterval),
selfOrg: secAdvisor.OrgByPeerIdentity(selfIdentity),
secAdvisor: secAdvisor,
selfIdentity: selfIdentity,
Expand Down Expand Up @@ -641,6 +643,7 @@ func (g *gossipServiceImpl) Stop() {
g.toDieChan <- struct{}{}
g.emitter.Stop()
g.ChannelDeMultiplexer.Close()
g.stateInfoMsgStore.Stop()
g.stopSignal.Wait()
comWG.Wait()
}
Expand Down

0 comments on commit 5eb5d07

Please sign in to comment.