Skip to content

Commit

Permalink
[FAB-11837] Track membership changes
Browse files Browse the repository at this point in the history
adding a struct named mt to gossip channel to
track after membership changes per channel

Change-Id: I58dc053dba8d0b52a38a43b02e1bb0f72a4b9358
Signed-off-by: Inbar Badian <[email protected]>
  • Loading branch information
Inbar Badian committed Nov 29, 2018
1 parent edc43d0 commit c721874
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 4 deletions.
1 change: 0 additions & 1 deletion gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ type identifier func() (*PeerIdentification, error)

// Discovery is the interface that represents a discovery module
type Discovery interface {

// Lookup returns a network member, or nil if not found
Lookup(PKIID common.PKIidType) *NetworkMember

Expand Down
102 changes: 101 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package channel
import (
"bytes"
"fmt"
"reflect"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -39,11 +40,11 @@ type Config struct {
RequestStateInfoInterval time.Duration
BlockExpirationInterval time.Duration
StateInfoCacheSweepInterval time.Duration
TimeForMembershipTracker time.Duration
}

// GossipChannel defines an object that deals with all channel-related messages
type GossipChannel interface {

// Self returns a StateInfoMessage about the peer
Self() *proto.SignedGossipMessage

Expand Down Expand Up @@ -150,6 +151,7 @@ type gossipChannel struct {
ledgerHeight uint64
incTime uint64
leftChannel int32
membershipTracker *membershipTracker
}

type membershipFilter struct {
Expand All @@ -162,6 +164,7 @@ func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
if mf.hasLeftChannel() {
return nil
}

var members []discovery.NetworkMember
for _, mem := range mf.adapter.GetMembership() {
if mf.eligibleForChannelAndSameOrg(mem) {
Expand Down Expand Up @@ -263,12 +266,27 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M
go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
// Periodically request state info
go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)

ticker := time.NewTicker(gc.GetConf().TimeForMembershipTracker)
gc.membershipTracker = &membershipTracker{
getPeersToTrack: gc.GetPeers,
report: gc.reportMembershipChanges,
stopChan: make(chan struct{}, 1),
tickerChannel: ticker.C,
}

go gc.membershipTracker.trackMembershipChanges()
return gc
}

func (gc *gossipChannel) reportMembershipChanges(input ...interface{}) {
gc.logger.Info(input...)
}

// Stop stop the channel operations
func (gc *gossipChannel) Stop() {
gc.stopChan <- struct{}{}
gc.membershipTracker.stopChan <- struct{}{}
gc.blocksPuller.Stop()
gc.stateInfoPublishScheduler.Stop()
gc.stateInfoRequestScheduler.Stop()
Expand Down Expand Up @@ -958,3 +976,85 @@ func GenerateMAC(pkiID common.PKIidType, channelID common.ChainID) []byte {
preImage := append([]byte(pkiID), []byte(channelID)...)
return common_utils.ComputeSHA256(preImage)
}

//membershipTracker is a struct for tracking changes in peers of the channel
type membershipTracker struct {
getPeersToTrack func() []discovery.NetworkMember
report func(...interface{})
stopChan chan struct{}
tickerChannel <-chan time.Time
}

//endpoints return all peers by their endpoints
func endpoints(members discovery.Members) [][]string {
var currView [][]string
for _, member := range members {
ep := member.Endpoint
epi := member.InternalEndpoint
var endPoints []string
if ep != epi {
endPoints = append(endPoints, ep, epi)
} else {
endPoints = append(endPoints, ep)
}
currView = append(currView, endPoints)
}
return currView
}

//checkIfPeersChanged checks which peers are offline and which are online for channel
func (mt *membershipTracker) checkIfPeersChanged(prevPeers discovery.Members, currPeers discovery.Members,
prevSetPeers map[string]struct{}, currSetPeers map[string]struct{}) {
var currView [][]string

wereInPrev := endpoints(prevPeers.Filter(func(member discovery.NetworkMember) bool {
_, exists := currSetPeers[string(member.PKIid)]
return !exists
}))
newInCurr := endpoints(currPeers.Filter(func(member discovery.NetworkMember) bool {
_, exists := prevSetPeers[string(member.PKIid)]
return !exists
}))
currView = endpoints(currPeers)

if !reflect.DeepEqual(wereInPrev, newInCurr) {
if len(wereInPrev) == 0 {
mt.report("Membership view has changed. peers went online: ", newInCurr, ", current view: ", currView)
} else if len(newInCurr) == 0 {
mt.report("Membership view has changed. peers went offline: ", wereInPrev, ", current view: ", currView)
} else {
mt.report("Membership view has changed. peers went offline: ", wereInPrev, ", peers went online: ", newInCurr, ", current view: ", currView)
}
}
}

func (mt *membershipTracker) createSetOfPeers(peersToMakeSet []discovery.NetworkMember) map[string]struct{} {
setPeers := make(map[string]struct{})
for _, prevPeer := range peersToMakeSet {
prevPeerID := string(prevPeer.PKIid)
setPeers[prevPeerID] = struct{}{}
}
return setPeers
}

func (mt *membershipTracker) trackMembershipChanges() {
prevSetPeers := make(map[string]struct{})
prev := mt.getPeersToTrack()
prevSetPeers = mt.createSetOfPeers(prev)
for {
currSetPeers := make(map[string]struct{})
//timeout to check changes in peers
select {
case <-mt.stopChan:
return
case <-mt.tickerChannel:
//get current peers
currPeers := mt.getPeersToTrack()
currSetPeers = mt.createSetOfPeers(currPeers)
mt.checkIfPeersChanged(prev, currPeers, prevSetPeers, currSetPeers)
prev = currPeers
prevSetPeers = map[string]struct{}{}
prevSetPeers = mt.createSetOfPeers(prev)
}
}
}
Loading

0 comments on commit c721874

Please sign in to comment.