Skip to content

Commit

Permalink
Merge "Add new test to check state communication"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Jan 9, 2017
2 parents 1b7bb44 + 9e05f49 commit 01de0e4
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
12 changes: 7 additions & 5 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ var logFormat = logging.MustStringFormatter(
`%{color}%{level} %{longfunc}():%{color:reset}(%{module})%{message}`,
)

var remoteStateMsgFilter = func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}


const (
defPollingPeriod = 200 * time.Millisecond
defAntiEntropyInterval = 10 * time.Second
Expand Down Expand Up @@ -89,7 +94,6 @@ type GossipStateProviderImpl struct {
// NewGossipStateProvider creates initialized instance of gossip state provider
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider {
logger, _ := logging.GetLogger("GossipStateProvider")
logging.SetLevel(logging.DEBUG, logger.Module)

gossipChan, _ := g.Accept(func(message interface{}) bool {
// Get only data messages
Expand All @@ -98,9 +102,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
}, false)

// Filter message which are only relevant for state transfer
_, commChan := g.Accept(func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}, true)
_, commChan := g.Accept(remoteStateMsgFilter, true)

height, err := committer.LedgerHeight()

Expand Down Expand Up @@ -139,7 +141,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next())
bytes, err := state.Bytes()
if err == nil {
s.logger.Debug("[VVV]: Updating gossip metadate state", state)
s.logger.Debug("Updating gossip metadate state", state)
g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID))
} else {
s.logger.Errorf("Unable to serialize node meta state, error = %s", err)
Expand Down
67 changes: 67 additions & 0 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -28,12 +29,14 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/proto"
pcomm "github.com/hyperledger/fabric/protos/common"
"github.com/op/go-logging"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -332,6 +335,70 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
}, 60*time.Second)
}

func TestGossipStateProvider_TestStateMessages(t *testing.T) {
viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()

bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0))
defer bootPeer.shutdown()

peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1))
defer peer.shutdown()

_, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true)
_, peerCh := peer.g.Accept(remoteStateMsgFilter, true)

wg := sync.WaitGroup{}
wg.Add(2)

go func() {
msg := <-bootCh
logger.Info("Bootstrap node got message, ", msg)
assert.True(t, msg.GetGossipMessage().GetStateRequest() != nil)
msg.Respond(&proto.GossipMessage{
Content: &proto.GossipMessage_StateResponse{&proto.RemoteStateResponse{nil}},
})
wg.Done()
}()

go func() {
msg := <-peerCh
logger.Info("Peer node got an answer, ", msg)
assert.True(t, msg.GetGossipMessage().GetStateResponse() != nil)
wg.Done()

}()

readyCh := make(chan struct{})
go func() {
wg.Wait()
readyCh <- struct{}{}
}()

time.Sleep(time.Duration(5) * time.Second)
logger.Info("Sending gossip message with remote state request")

chainID := common.ChainID(util.GetTestChainID())

peer.g.Send(&proto.GossipMessage{
Content: &proto.GossipMessage_StateRequest{&proto.RemoteStateRequest{nil}},
}, &comm.RemotePeer{peer.g.PeersOfChannel(chainID)[0].Endpoint, peer.g.PeersOfChannel(chainID)[0].PKIid})
logger.Info("Waiting until peers exchange messages")

select {
case <-readyCh:
{
logger.Info("[XXX]: Done!!!")

}
case <-time.After(time.Duration(10) * time.Second):
{
t.Fail()
}
}
}

func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) {
ch := make(chan struct{})
go func() {
Expand Down

0 comments on commit 01de0e4

Please sign in to comment.