Skip to content

Commit

Permalink
[FAB-5849] calibrate state transfer pace
Browse files Browse the repository at this point in the history
Blocks received from peers via state transfer are added to the payload
buffer right away regardless the payload buffer's size.

In cases when state transfer is much faster than the commit process,
blocks pile up in the payload buffer and the peer might be out of memory.

This change set makes the method that handles payload reception from remote
peers to add the payloads through the same code path that receives blocks
from the orderer, which blocks in case the payload buffer is too overpopulated.

Change-Id: I2fc1a916b809311a7d3aa0308b64d2127ad1ee60
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Sep 23, 2017
1 parent 51d4df6 commit 7de3912
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 18 deletions.
2 changes: 1 addition & 1 deletion gossip/state/mocks/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (g *GossipMock) Gossip(msg *proto.GossipMessage) {
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
args := g.Called(acceptor, passThrough)
if args.Get(0) == nil {
return nil, args.Get(1).(<-chan proto.ReceivedMessage)
return nil, args.Get(1).(chan proto.ReceivedMessage)
}
return args.Get(0).(<-chan *proto.GossipMessage), nil
}
Expand Down
5 changes: 3 additions & 2 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
if max < payload.SeqNum {
max = payload.SeqNum
}
err := s.payloads.Push(payload)

err := s.addPayload(payload, blocking)
if err != nil {
logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
logger.Warningf("Payload with sequence number %d wasn't added to payload buffer: %v", payload.SeqNum, err)
}
}
return max, nil
Expand Down
97 changes: 82 additions & 15 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestNilDirectMsg(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
p.s.handleStateRequest(nil)
Expand All @@ -305,7 +305,7 @@ func TestNilAddPayload(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
err := p.s.AddPayload(nil)
Expand All @@ -318,7 +318,7 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
// Simulate a problem in the ledger
Expand All @@ -339,6 +339,77 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
assert.Contains(t, err.Error(), "cannot query ledger")
}

func TestLargeBlockGap(t *testing.T) {
// Scenario: the peer knows of a peer who has a ledger height much higher
// than itself (500 blocks higher).
// The peer needs to ask blocks in a way such that the size of the payload buffer
// never rises above a certain threshold.

mc := &mockCommitter{}
blocksPassedToLedger := make(chan uint64, 200)
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
})
msgsFromPeer := make(chan proto.ReceivedMessage)
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
membership := []discovery.NetworkMember{
{
PKIid: common.PKIidType("a"),
Endpoint: "a",
Properties: &proto.Properties{
LedgerHeight: 500,
},
}}
g.On("PeersOfChannel", mock.Anything).Return(membership)
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, msgsFromPeer)
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
msg := arguments.Get(0).(*proto.GossipMessage)
// The peer requested a state request
req := msg.GetStateRequest()
// Construct a skeleton for the response
res := &proto.GossipMessage{
Nonce: msg.Nonce,
Channel: []byte(util.GetTestChainID()),
Content: &proto.GossipMessage_StateResponse{
StateResponse: &proto.RemoteStateResponse{},
},
}
// Populate the response with payloads according to what the peer asked
for seq := req.StartSeqNum; seq <= req.EndSeqNum; seq++ {
rawblock := pcomm.NewBlock(seq, []byte{})
b, _ := pb.Marshal(rawblock)
payload := &proto.Payload{
SeqNum: seq,
Data: b,
}
res.GetStateResponse().Payloads = append(res.GetStateResponse().Payloads, payload)
}
// Finally, send the response down the channel the peer expects to receive it from
sMsg, _ := res.NoopSign()
msgsFromPeer <- &comm.ReceivedMessageImpl{
SignedGossipMessage: sMsg,
}
})
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()

// Process blocks at a speed of 20 Millisecond for each block.
// The imaginative peer that responds to state
// If the payload buffer expands above defMaxBlockDistance*2 + defAntiEntropyBatchSize blocks, fail the test
blockProcessingTime := 20 * time.Millisecond // 10 seconds for total 500 blocks
expectedSequence := 1
for expectedSequence < 500 {
blockSeq := <-blocksPassedToLedger
assert.Equal(t, expectedSequence, int(blockSeq))
// Ensure payload buffer isn't over-populated
assert.True(t, p.s.payloads.Size() <= defMaxBlockDistance*2+defAntiEntropyBatchSize, "payload buffer size is %d", p.s.payloads.Size())
expectedSequence++
time.Sleep(blockProcessingTime)
}
}

func TestOverPopulation(t *testing.T) {
// Scenario: Add to the state provider blocks
// with a gap in between, and ensure that the payload buffer
Expand All @@ -353,7 +424,7 @@ func TestOverPopulation(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -415,7 +486,7 @@ func TestBlockingEnqueue(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -476,7 +547,7 @@ func TestFailures(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
assert.Panics(t, func() {
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
Expand Down Expand Up @@ -535,7 +606,7 @@ func TestGossipReception(t *testing.T) {
g.On("Accept", mock.Anything, false).Return(rmc, nil).Run(func(_ mock.Arguments) {
signalChan <- struct{}{}
})
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
mc := &mockCommitter{}
receivedChan := make(chan struct{})
Expand Down Expand Up @@ -576,7 +647,7 @@ func TestMetadataCompatibility(t *testing.T) {
finChan <- struct{}{}
})
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
metaState := common.NewNodeMetastate(5)
b, _ := metaState.Bytes()
defaultPeer := discovery.NetworkMember{
Expand Down Expand Up @@ -1149,12 +1220,8 @@ func TestTransferOfPrivateRWSet(t *testing.T) {
return ch
}

commChannelFactory := func(ch chan proto.ReceivedMessage) <-chan proto.ReceivedMessage {
return ch
}

g.On("Accept", mock.Anything, false).Return(gossipChannelFactory(gossipChannel), nil)
g.On("Accept", mock.Anything, true).Return(nil, commChannelFactory(commChannel))
g.On("Accept", mock.Anything, true).Return(nil, commChannel)

g.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
Expand Down Expand Up @@ -1331,7 +1398,7 @@ func (t testPeer) Gossip() <-chan *proto.GossipMessage {
return t.gossipChannel
}

func (t testPeer) Comm() <-chan proto.ReceivedMessage {
func (t testPeer) Comm() chan proto.ReceivedMessage {
return t.commChannel
}

Expand Down Expand Up @@ -1372,7 +1439,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
Return(nil, peer.Comm()).
Once().
On("Accept", mock.Anything, true).
Return(nil, make(<-chan proto.ReceivedMessage))
Return(nil, make(chan proto.ReceivedMessage))

peer.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
peer.coord.On("Close")
Expand Down

0 comments on commit 7de3912

Please sign in to comment.