diff --git a/gossip/gossip/certstore_test.go b/gossip/gossip/certstore_test.go index 95d880e8625..2e45b172aeb 100644 --- a/gossip/gossip/certstore_test.go +++ b/gossip/gossip/certstore_test.go @@ -110,7 +110,7 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) comm.Receive PullInterval: time.Millisecond * 500, Tag: proto.GossipMessage_EMPTY, Channel: nil, - Id: "id1", + ID: "id1", } sender := &senderMock{} memberSvc := &membershipSvcMock{} diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index 52e9d33d114..0f219a2b6f0 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -244,7 +244,7 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator { conf := pull.PullConfig{ MsgType: proto.PullMsgType_BlockMessage, Channel: []byte(gc.chainID), - Id: gc.GetConf().ID, + ID: gc.GetConf().ID, PeerCountToSelect: gc.GetConf().PullPeerNum, PullInterval: gc.GetConf().PullInterval, Tag: proto.GossipMessage_CHAN_AND_ORG, diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index d02f5cb5095..2b2c623067e 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -789,7 +789,7 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator { conf := pull.PullConfig{ MsgType: proto.PullMsgType_IdentityMsg, Channel: []byte(""), - Id: g.conf.SelfEndpoint, + ID: g.conf.SelfEndpoint, PeerCountToSelect: g.conf.PullPeerNum, PullInterval: g.conf.PullInterval, Tag: proto.GossipMessage_EMPTY, diff --git a/gossip/gossip/msgstore/msgs.go b/gossip/gossip/msgstore/msgs.go index 7b7316fe8eb..8d730813ceb 100644 --- a/gossip/gossip/msgstore/msgs.go +++ b/gossip/gossip/msgstore/msgs.go @@ -27,6 +27,8 @@ import ( // then the invalidation trigger on 0 was called when 1 was added. type invalidationTrigger func(message interface{}) +// NewMessageStore returns a new MessageStore with the message replacing +// policy and invalidation trigger passed. func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore { return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger} } @@ -34,8 +36,8 @@ func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrig // MessageStore adds messages to an internal buffer. // When a message is received, it might: // - Be added to the buffer -// - Discarded because of some message already in the buffer (invalidated) -// - Make a message already in the buffer to be discarded (invalidates) +// - Discarded because of some message already in the buffer (invalidated) +// - Make a message already in the buffer to be discarded (invalidates) // When a message is invalidated, the invalidationTrigger is invoked on that message. type MessageStore interface { // add adds a message to the store diff --git a/gossip/gossip/pull/pullstore.go b/gossip/gossip/pull/pullstore.go index e16a68493c7..26718c30084 100644 --- a/gossip/gossip/pull/pullstore.go +++ b/gossip/gossip/pull/pullstore.go @@ -29,6 +29,7 @@ import ( "github.com/op/go-logging" ) +// Constants go here. const ( HelloMsgType PullMsgType = iota DigestMsgType @@ -40,8 +41,9 @@ const ( type PullMsgType int // MessageHook defines a function that will run after a certain pull message is received -type MessageHook func(itemIds []string, items []*proto.GossipMessage, msg comm.ReceivedMessage) +type MessageHook func(itemIDs []string, items []*proto.GossipMessage, msg comm.ReceivedMessage) +// Sender sends messages to remote peers type Sender interface { // Send sends a message to a list of remote peers Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) @@ -55,7 +57,7 @@ type MembershipService interface { // PullConfig defines the configuration of the pull mediator type PullConfig struct { - Id string + ID string PullInterval time.Duration // Duration between pull invocations PeerCountToSelect int // Number of peers to initiate pull with Tag proto.GossipMessage_Tag @@ -64,7 +66,7 @@ type PullConfig struct { } // Mediator is a component wrap a PullEngine and provides the methods -// it needs to perform pull synchronization.. +// it needs to perform pull synchronization. // The specialization of a pull mediator to a certain type of message is // done by the configuration, a IdentifierExtractor, IdentifierExtractor // given at construction, and also hooks that can be registered for each @@ -86,28 +88,29 @@ type Mediator interface { HandleMessage(msg comm.ReceivedMessage) } -// pullStoreImpl is an implementation of PullStore +// pullMediatorImpl is an implementation of Mediator type pullMediatorImpl struct { + sync.RWMutex + Sender msgType2Hook map[PullMsgType][]MessageHook idExtractor proto.IdentifierExtractor msgCons proto.MsgConsumer config PullConfig logger *logging.Logger - sync.RWMutex - itemId2msg map[string]*proto.GossipMessage - Sender - memBvc MembershipService - engine *algo.PullEngine + itemID2Msg map[string]*proto.GossipMessage + memBvc MembershipService + engine *algo.PullEngine } +// NewPullMediator returns a new Mediator func NewPullMediator(config PullConfig, sndr Sender, memSvc MembershipService, idExtractor proto.IdentifierExtractor, msgCons proto.MsgConsumer) Mediator { p := &pullMediatorImpl{ msgCons: msgCons, msgType2Hook: make(map[PullMsgType][]MessageHook), idExtractor: idExtractor, config: config, - logger: util.GetLogger(util.LoggingPullModule, config.Id), - itemId2msg: make(map[string]*proto.GossipMessage), + logger: util.GetLogger(util.LoggingPullModule, config.ID), + itemID2Msg: make(map[string]*proto.GossipMessage), memBvc: memSvc, Sender: sndr, } @@ -128,7 +131,7 @@ func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) { p.logger.Debug(msg) - itemIds := []string{} + itemIDs := []string{} items := []*proto.GossipMessage{} var pullMsgType PullMsgType @@ -137,33 +140,33 @@ func (p *pullMediatorImpl) HandleMessage(m comm.ReceivedMessage) { p.engine.OnHello(helloMsg.Nonce, m) } if digest := msg.GetDataDig(); digest != nil { - itemIds = digest.Digests + itemIDs = digest.Digests pullMsgType = DigestMsgType p.engine.OnDigest(digest.Digests, digest.Nonce, m) } if req := msg.GetDataReq(); req != nil { - itemIds = req.Digests + itemIDs = req.Digests pullMsgType = RequestMsgType p.engine.OnReq(req.Digests, req.Nonce, m) } if res := msg.GetDataUpdate(); res != nil { - itemIds = make([]string, len(res.Data)) + itemIDs = make([]string, len(res.Data)) items = make([]*proto.GossipMessage, len(res.Data)) pullMsgType = ResponseMsgType for i, pulledMsg := range res.Data { p.msgCons(pulledMsg) - itemIds[i] = p.idExtractor(pulledMsg) + itemIDs[i] = p.idExtractor(pulledMsg) items[i] = pulledMsg p.Lock() - p.itemId2msg[itemIds[i]] = pulledMsg + p.itemID2Msg[itemIDs[i]] = pulledMsg p.Unlock() } - p.engine.OnRes(itemIds, res.Nonce) + p.engine.OnRes(itemIDs, res.Nonce) } // Invoke hooks for relevant message type for _, h := range p.hooksByMsgType(pullMsgType) { - h(itemIds, items, m) + h(itemIDs, items, m) } } @@ -183,18 +186,18 @@ func (p *pullMediatorImpl) RegisterMsgHook(pullMsgType PullMsgType, hook Message func (p *pullMediatorImpl) Add(msg *proto.GossipMessage) { p.Lock() defer p.Unlock() - itemId := p.idExtractor(msg) - p.itemId2msg[itemId] = msg - p.engine.Add(itemId) + itemID := p.idExtractor(msg) + p.itemID2Msg[itemID] = msg + p.engine.Add(itemID) } // Remove removes a GossipMessage from the store func (p *pullMediatorImpl) Remove(msg *proto.GossipMessage) { p.Lock() defer p.Unlock() - itemId := p.idExtractor(msg) - delete(p.itemId2msg, itemId) - p.engine.Remove(itemId) + itemID := p.idExtractor(msg) + delete(p.itemID2Msg, itemID) + p.engine.Remove(itemID) } // SelectPeers returns a slice of peers which the engine will initiate the protocol with @@ -271,7 +274,7 @@ func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce ui p.RLock() defer p.RUnlock() for _, item := range items { - if msg, exists := p.itemId2msg[item]; exists { + if msg, exists := p.itemID2Msg[item]; exists { items2return = append(items2return, msg) } } @@ -314,6 +317,7 @@ func (p *pullMediatorImpl) hooksByMsgType(msgType PullMsgType) []MessageHook { return returnedHooks } +// SelectEndpoints select k peers from peerPool and returns them. func SelectEndpoints(k int, peerPool []discovery.NetworkMember) []*comm.RemotePeer { if len(peerPool) < k { k = len(peerPool) diff --git a/gossip/gossip/pull/pullstore_test.go b/gossip/gossip/pull/pullstore_test.go index a710d487ac7..7f0e1bb2824 100644 --- a/gossip/gossip/pull/pullstore_test.go +++ b/gossip/gossip/pull/pullstore_test.go @@ -114,7 +114,7 @@ func createPullInstance(endpoint string, peer2PullInst map[string]*pullInstance) conf := PullConfig{ MsgType: proto.PullMsgType_BlockMessage, Channel: []byte(""), - Id: endpoint, + ID: endpoint, PeerCountToSelect: 3, PullInterval: pullInterval, Tag: proto.GossipMessage_EMPTY, diff --git a/gossip/identity/identity.go b/gossip/identity/identity.go index 65407a2284e..1b785d3ab4e 100644 --- a/gossip/identity/identity.go +++ b/gossip/identity/identity.go @@ -55,7 +55,7 @@ type identityMapperImpl struct { sync.RWMutex } -// NewIdentityStore method, all we need is a reference to a MessageCryptoService +// NewIdentityMapper method, all we need is a reference to a MessageCryptoService func NewIdentityMapper(mcs api.MessageCryptoService) Mapper { return &identityMapperImpl{ mcs: mcs, diff --git a/gossip/proto/extensions.go b/gossip/proto/extensions.go index 69e933d9f72..3fc61a3f44c 100644 --- a/gossip/proto/extensions.go +++ b/gossip/proto/extensions.go @@ -18,7 +18,6 @@ package proto import ( "bytes" - "fmt" "github.com/golang/protobuf/proto" diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index e4db73e83fe..f3ed41fed71 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -46,7 +46,7 @@ var ( logger = gossipUtil.GetLogger(gossipUtil.LoggingStateModule, "") ) -var orgId = []byte("ORG1") +var orgID = []byte("ORG1") var anchorPeerIdentity = api.PeerIdentityType("identityInOrg1") type joinChanMsg struct { @@ -69,7 +69,7 @@ type orgCryptoService struct { // OrgByPeerIdentity returns the OrgIdentityType // of a given peer identity func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { - return orgId + return orgID } // Verify verifies a JoinChannelMessage, returns nil on success,