Skip to content

Commit

Permalink
Some Gossip code refactoring and log msg formatting
Browse files Browse the repository at this point in the history
1) ToGossipMessage now returns a more meaningful error and the
   methods that call it now log an error that is based on the context
2) Follow-up fix of a CR comment:
   https://gerrit.hyperledger.org/r/#/c/5965/13/gossip/comm/comm_impl.go@532
3) selfNetworkMember() accidently logged a log entry that says
   it is starting a new gossip service, but actually the method
   is invoked more than once- so multiple messages were logged
   instead of only one, so I extracted the log invocation
   to the constructor method.
4) The SignedGossipMessage has a payload which is a byte slice.
   No reason to log it, so I implemented a String() method for
   the SignedGossipMessage that just logs the length of the
   Payload and signature, and of the secret payload.
5) The Payload of the DataMessage that contains a ledger block
   should be printed as the length of the block and its sequence.
   I implemented String() method for GossipMessage_DataMsg.
6) Some messages contain a snapshot of envelopes, or payloads
   and are printed as a slice of tuples of payload (bytes)
   and signatures (bytes).
   This is ugly, fills the logs and doesn't give any info.
   I made the String() of GossipMessage print only the number of items
   in case it contains such a message.

Signed-off-by: Yacov Manevich <[email protected]>
Change-Id: Ie59b08ed94ef2f602b36d2ba3ab9b7260bf3f947
  • Loading branch information
yacovm committed Feb 27, 2017
1 parent 9a09ac0 commit a5b09f0
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 10 deletions.
6 changes: 4 additions & 2 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"net"
"os"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -519,8 +520,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration, address string)
}
incChan <- msg
}
}
if clStr, isClientStr := stream.(proto.Gossip_GossipStreamClient); isClientStr {
} else if clStr, isClientStr := stream.(proto.Gossip_GossipStreamClient); isClientStr {
if m, err := clStr.Recv(); err == nil {
msg, err := m.ToGossipMessage()
if err != nil {
Expand All @@ -529,6 +529,8 @@ func readWithTimeout(stream interface{}, timeout time.Duration, address string)
}
incChan <- msg
}
} else {
panic(fmt.Errorf("Stream isn't a GossipStreamServer or a GossipStreamClient, but %v. Aborting", reflect.TypeOf(stream)))
}
}()
select {
Expand Down
4 changes: 2 additions & 2 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
for _, env := range memResp.Alive {
am, err := env.ToGossipMessage()
if err != nil {
d.logger.Warning("Failed extracting GossipMessage from envelope:", err)
d.logger.Warning("Membership response contains an invalid message from an online peer:", err)
return
}
if !am.IsAliveMsg() {
Expand All @@ -333,7 +333,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
for _, env := range memResp.Dead {
dm, err := env.ToGossipMessage()
if err != nil {
d.logger.Warning("Failed extracting GossipMessage from envelope:", err)
d.logger.Warning("Membership response contains an invalid message from an offline peer", err)
return
}
if !d.crypt.ValidateAliveMsg(m) {
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (cs *certStore) handleMessage(msg proto.ReceivedMessage) {
for _, env := range update.Data {
m, err := env.ToGossipMessage()
if err != nil {
cs.logger.Warning("Failed extracting GossipMessage from envelope:", err)
cs.logger.Warning("Data update contains an invalid message:", err)
return
}
if !m.IsIdentityMsg() {
Expand Down
4 changes: 2 additions & 2 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
for _, item := range m.GetDataUpdate().Data {
gMsg, err := item.ToGossipMessage()
if err != nil {
gc.logger.Warning("Failed extracting GossipMessage from envelope:", err)
gc.logger.Warning("Data update contains an invalid message:", err)
return
}
if !bytes.Equal(gMsg.Channel, []byte(gc.chainID)) {
Expand Down Expand Up @@ -444,7 +444,7 @@ func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender c
for _, envelope := range m.GetStateSnapshot().Elements {
stateInf, err := envelope.ToGossipMessage()
if err != nil {
gc.logger.Warning("Failed extracting GossipMessage from envelope:", err)
gc.logger.Warning("StateInfo snapshot contains an invalid message:", err)
return
}
if !stateInf.IsStateInfoMsg() {
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
g.disSecAdap = g.newDiscoverySecurityAdapter()

g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap)
g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())

g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)

Expand All @@ -141,7 +142,6 @@ func (g *gossipServiceImpl) selfNetworkMember() discovery.NetworkMember {
Metadata: []byte{},
InternalEndpoint: g.conf.InternalEndpoint,
}
g.logger.Info("Creating gossip service with self membership of", self)
return self
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/pull/pullstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (p *pullMediatorImpl) HandleMessage(m proto.ReceivedMessage) {
for i, pulledMsg := range res.Data {
msg, err := pulledMsg.ToGossipMessage()
if err != nil {
p.logger.Warning("Failed extracting GossipMessage from envelope:", err)
p.logger.Warning("Data update contains an invalid message:", err)
return
}
p.msgCons(msg)
Expand Down
75 changes: 74 additions & 1 deletion protos/gossip/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,13 @@ func (m *GossipMessage) IsTagLegal() error {
return fmt.Errorf("Unknown message type: %v", m)
}

// Verifier receives a peer identity, a signature and a message
// and returns nil if the signature on the message could be verified
// using the given identity.
type Verifier func(peerIdentity []byte, signature, message []byte) error

// Signer signs a message, and returns (signature, nil)
// on success, and nil and an error on failure.
type Signer func(msg []byte) ([]byte, error)

// ReceivedMessage is a GossipMessage wrapper that
Expand Down Expand Up @@ -356,6 +362,7 @@ func (m *SignedGossipMessage) Sign(signer Signer) *Envelope {
return e
}

// NoopSign creates a SignedGossipMessage with a nil signature
func (m *GossipMessage) NoopSign() *SignedGossipMessage {
signer := func(msg []byte) ([]byte, error) {
return nil, nil
Expand Down Expand Up @@ -397,22 +404,29 @@ func (m *SignedGossipMessage) Verify(peerIdentity []byte, verify Verifier) error
return nil
}

// IsSigned returns whether the message
// has a signature in the envelope.
func (m *SignedGossipMessage) IsSigned() bool {
return m.Envelope != nil && m.Envelope.Payload != nil && m.Envelope.Signature != nil
}

// ToGossipMessage un-marshals a given envelope and creates a
// SignedGossipMessage out of it.
// Returns an error if un-marshaling fails.
func (e *Envelope) ToGossipMessage() (*SignedGossipMessage, error) {
msg := &GossipMessage{}
err := proto.Unmarshal(e.Payload, msg)
if err != nil {
return nil, err
return nil, fmt.Errorf("Failed unmarshaling GossipMessage from envelope: %v", err)
}
return &SignedGossipMessage{
GossipMessage: msg,
Envelope: e,
}, nil
}

// SignSecret signs the secret payload and creates
// a secret envelope out of it.
func (e *Envelope) SignSecret(signer Signer, secret *Secret) {
payload, err := proto.Marshal(secret)
if err != nil {
Expand All @@ -428,6 +442,9 @@ func (e *Envelope) SignSecret(signer Signer, secret *Secret) {
}
}

// InternalEndpoint returns the internal endpoint
// in the secret envelope, or an empty string
// if a failure occurs.
func (s *SecretEnvelope) InternalEndpoint() string {
secret := &Secret{}
if err := proto.Unmarshal(s.Payload, secret); err != nil {
Expand All @@ -443,6 +460,62 @@ type SignedGossipMessage struct {
*GossipMessage
}

func (p *Payload) toString() string {
return fmt.Sprintf("Block message: {Data: %d bytes, seq: %d}", len(p.Data), p.SeqNum)
}

func (du *DataUpdate) toString() string {
mType := PullMsgType_name[int32(du.MsgType)]
return fmt.Sprintf("Type: %s, items: %d, nonce: %d", mType, len(du.Data), du.Nonce)
}

func (mr *MembershipResponse) toString() string {
return fmt.Sprintf("MembershipResponse with Alive: %d, Dead: %d", len(mr.Alive), len(mr.Dead))
}

func (sis *StateInfoSnapshot) toString() string {
return fmt.Sprintf("StateInfoSnapshot with %d items", len(sis.Elements))
}

// String returns a string representation
// of a SignedGossipMessage
func (m *SignedGossipMessage) String() string {
env := "No envelope"
if m.Envelope != nil {
var secretEnv string
if m.SecretEnvelope != nil {
pl := len(m.SecretEnvelope.Payload)
sl := len(m.SecretEnvelope.Signature)
secretEnv = fmt.Sprintf(" Secret payload: %d bytes, Secret Signature: %d bytes", pl, sl)
}
env = fmt.Sprintf("%d bytes, Signature: %d bytes%s", len(m.Envelope.Payload), len(m.Envelope.Signature), secretEnv)
}
gMsg := "No gossipMessage"
if m.GossipMessage != nil {
var isSimpleMsg bool
if m.GetStateResponse() != nil {
gMsg = fmt.Sprintf("StateResponse with %d items", len(m.GetStateResponse().Payloads))
} else if m.IsDataMsg() {
gMsg = m.GetDataMsg().Payload.toString()
} else if m.IsDataUpdate() {
update := m.GetDataUpdate()
gMsg = fmt.Sprintf("DataUpdate: %s", update.toString())
} else if m.GetMemRes() != nil {
gMsg = m.GetMemRes().toString()
} else if m.IsStateInfoSnapshot() {
gMsg = m.GetStateSnapshot().toString()
} else {
gMsg = m.GossipMessage.String()
isSimpleMsg = true
}
if !isSimpleMsg {
desc := fmt.Sprintf("Channel: %v, nonce: %d, tag: %s", m.Channel, m.Nonce, GossipMessage_Tag_name[int32(m.Tag)])
gMsg = fmt.Sprintf("%s %s", desc, gMsg)
}
}
return fmt.Sprintf("GossipMessage: %v, Envelope: %s", gMsg, env)
}

// Abs returns abs(a-b)
func abs(a, b uint64) uint64 {
if a > b {
Expand Down
149 changes: 149 additions & 0 deletions protos/gossip/extensions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestToString(t *testing.T) {
// Ensure we don't print the byte content when we
// log messages.
// Each payload or signature contains '2' so we would've logged
// them if not for the overloading of the String() method in SignedGossipMessage

// The following line proves that the envelopes constructed in this test
// have "2" in them when they are printed
assert.Contains(t, fmt.Sprintf("%v", envelopes()[0]), "2")
// and the following does the same for payloads:
dMsg := &DataMessage{
Payload: &Payload{
SeqNum: 3,
Data: []byte{2, 2, 2, 2, 2},
},
}
assert.Contains(t, fmt.Sprintf("%v", dMsg), "2")

// Now we construct all types of messages that have envelopes or payloads in them
// and see that "2" is not outputted into their formatting even though it is found
// as a sub-message of the outer message.

sMsg := &SignedGossipMessage{
GossipMessage: &GossipMessage{
Tag: GossipMessage_EMPTY,
Nonce: 5,
Channel: []byte("A"),
Content: &GossipMessage_DataMsg{
DataMsg: &DataMessage{
Payload: &Payload{
SeqNum: 3,
Data: []byte{2, 2, 2, 2, 2},
},
},
},
},
Envelope: &Envelope{
Payload: []byte{0, 1, 2, 3, 4, 5, 6},
Signature: []byte{0, 1, 2},
SecretEnvelope: &SecretEnvelope{
Payload: []byte{0, 1, 2, 3, 4, 5},
Signature: []byte{0, 1, 2},
},
},
}
assert.NotContains(t, fmt.Sprintf("%v", sMsg), "2")

sMsg = &SignedGossipMessage{
GossipMessage: &GossipMessage{
Channel: []byte("A"),
Tag: GossipMessage_EMPTY,
Nonce: 5,
Content: &GossipMessage_DataUpdate{
DataUpdate: &DataUpdate{
Nonce: 11,
MsgType: PullMsgType_BlockMessage,
Data: envelopes(),
},
},
},
Envelope: envelopes()[0],
}
assert.NotContains(t, fmt.Sprintf("%v", sMsg), "2")

sMsg = &SignedGossipMessage{
GossipMessage: &GossipMessage{
Channel: []byte("A"),
Tag: GossipMessage_EMPTY,
Nonce: 5,
Content: &GossipMessage_MemRes{
MemRes: &MembershipResponse{
Alive: envelopes(),
Dead: envelopes(),
},
},
},
Envelope: envelopes()[0],
}
assert.NotContains(t, fmt.Sprintf("%v", sMsg), "2")

sMsg = &SignedGossipMessage{
GossipMessage: &GossipMessage{
Channel: []byte("A"),
Tag: GossipMessage_EMPTY,
Nonce: 5,
Content: &GossipMessage_StateSnapshot{
StateSnapshot: &StateInfoSnapshot{
Elements: envelopes(),
},
},
},
Envelope: envelopes()[0],
}
assert.NotContains(t, fmt.Sprintf("%v", sMsg), "2")

sMsg = &SignedGossipMessage{
GossipMessage: &GossipMessage{
Channel: []byte("A"),
Tag: GossipMessage_EMPTY,
Nonce: 5,
Content: &GossipMessage_StateResponse{
StateResponse: &RemoteStateResponse{
Payloads: []*Payload{
{Data: []byte{2, 2, 2}},
},
},
},
},
Envelope: envelopes()[0],
}
assert.NotContains(t, fmt.Sprintf("%v", sMsg), "2")
}

func envelopes() []*Envelope {
return []*Envelope{
{Payload: []byte{2, 2, 2},
Signature: []byte{2, 2, 2},
SecretEnvelope: &SecretEnvelope{
Payload: []byte{2, 2, 2},
Signature: []byte{2, 2, 2},
},
},
}
}

0 comments on commit a5b09f0

Please sign in to comment.