diff --git a/network/gossip.go b/network/gossip.go index d588bc7fd..cf9570309 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -25,6 +25,7 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even conf *Config, log *logger.SubLogger, ) *gossipService { opts := []lp2pps.Option{ + lp2pps.WithFloodPublish(true), lp2pps.WithMessageSignaturePolicy(lp2pps.StrictNoSign), lp2pps.WithNoAuthor(), lp2pps.WithMessageIdFn(MessageIDFunc), @@ -35,15 +36,17 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even opts = append(opts, lp2pps.WithPeerExchange(true)) } + gsParams := lp2pps.DefaultGossipSubParams() if conf.IsGossiper { // turn off the mesh in gossiper mode - lp2pps.GossipSubD = 0 - lp2pps.GossipSubDscore = 0 - lp2pps.GossipSubDlo = 0 - lp2pps.GossipSubDhi = 0 - lp2pps.GossipSubDout = 0 - lp2pps.GossipSubDlazy = conf.ScaledMinConns() + gsParams.D = 0 + gsParams.Dscore = 0 + gsParams.Dlo = 0 + gsParams.Dhi = 0 + gsParams.Dout = 0 + gsParams.Dlazy = conf.ScaledMinConns() } + opts = append(opts, lp2pps.WithGossipSubParams(gsParams)) pubsub, err := lp2pps.NewGossipSub(ctx, host, opts...) if err != nil { diff --git a/sync/bundle/bundle.go b/sync/bundle/bundle.go index 09c231c97..e0ebb1c45 100644 --- a/sync/bundle/bundle.go +++ b/sync/bundle/bundle.go @@ -20,8 +20,9 @@ const ( ) type Bundle struct { - Flags int - Message message.Message + Flags int + SequenceNo int + Message message.Message } func NewBundle(msg message.Message) *Bundle { @@ -43,10 +44,15 @@ func (b *Bundle) CompressIt() { b.Flags = util.SetFlag(b.Flags, BundleFlagCompressed) } +func (b *Bundle) SetSequenceNo(seqNo int) { + b.SequenceNo = seqNo +} + type _Bundle struct { Flags int `cbor:"1,keyasint"` MessageType message.Type `cbor:"2,keyasint"` MessageData []byte `cbor:"3,keyasint"` + SequenceNo int `cbor:"4,keyasint"` } func (b *Bundle) Encode() ([]byte, error) { @@ -66,6 +72,7 @@ func (b *Bundle) Encode() ([]byte, error) { Flags: b.Flags, MessageType: b.Message.Type(), MessageData: data, + SequenceNo: b.SequenceNo, } return cbor.Marshal(msg) @@ -95,6 +102,7 @@ func (b *Bundle) Decode(r io.Reader) (int, error) { } b.Flags = bdl.Flags + b.SequenceNo = bdl.SequenceNo b.Message = msg return bytesRead, cbor.Unmarshal(data, msg) } diff --git a/sync/bundle/bundle_test.go b/sync/bundle/bundle_test.go index 7438effd9..b96002bd0 100644 --- a/sync/bundle/bundle_test.go +++ b/sync/bundle/bundle_test.go @@ -81,7 +81,8 @@ func TestDecodeVoteCBOR(t *testing.T) { "035879a101a70101" + "02186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89f3d235e32b4b92055501c0067d277f2dff" + "99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848baffa62bcbf1a4021522c56693f0a7bbcc1f" + - "e865277556ee59c1f63ba592acfe1b43") + "e865277556ee59c1f63ba592acfe1b43" + + "0401") // SequenceNo d2, _ := hex.DecodeString( "a3" + "01190100" + // flags: 0x0100 (compressed) @@ -89,7 +90,8 @@ func TestDecodeVoteCBOR(t *testing.T) { "0358951f8b08" + "000000000000ff00790086ffa101a7010102186403010458200264572d4d6bfcd2140d4f885fd5a32fe42fdbf40551e4ff89" + "f3d235e32b4b92055501c0067d277f2dff99943016d6a0f379cf09846c6f06f60758308ab7aecbe03c4ed5b688bcb7e848ba" + - "ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000") + "ffa62bcbf1a4021522c56693f0a7bbcc1fe865277556ee59c1f63ba592acfe1b43010000ffff798ce7ec79000000" + + "0401") // SequenceNo bdl1 := new(Bundle) bdl2 := new(Bundle) @@ -102,3 +104,10 @@ func TestDecodeVoteCBOR(t *testing.T) { assert.Equal(t, bdl1.Message, bdl2.Message) assert.Contains(t, bdl1.String(), "vote") } + +func TestSetSequenceNo(t *testing.T) { + bdl := new(Bundle) + bdl.SetSequenceNo(1001) + + assert.Equal(t, 1001, bdl.SequenceNo) +} diff --git a/sync/peerset/peer_set.go b/sync/peerset/peer_set.go index e8f679924..14a9bcf84 100644 --- a/sync/peerset/peer_set.go +++ b/sync/peerset/peer_set.go @@ -25,6 +25,7 @@ type PeerSet struct { sessions map[int]*session.Session nextSessionID int sessionTimeout time.Duration + totalSentBundles int totalSentBytes int64 totalReceivedBytes int64 sentBytes map[message.Type]int64 @@ -326,10 +327,11 @@ func (ps *PeerSet) IncreaseReceivedBytesCounter(pid peer.ID, msgType message.Typ ps.receivedBytes[msgType] += c } -func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid *peer.ID) { +func (ps *PeerSet) IncreaseSentCounters(msgType message.Type, c int64, pid *peer.ID) { ps.lk.Lock() defer ps.lk.Unlock() + ps.totalSentBundles++ ps.totalSentBytes += c ps.sentBytes[msgType] += c @@ -339,6 +341,13 @@ func (ps *PeerSet) IncreaseSentBytesCounter(msgType message.Type, c int64, pid * } } +func (ps *PeerSet) TotalSentBundles() int { + ps.lk.RLock() + defer ps.lk.RUnlock() + + return ps.totalSentBundles +} + func (ps *PeerSet) TotalSentBytes() int64 { ps.lk.RLock() defer ps.lk.RUnlock() diff --git a/sync/peerset/peer_set_test.go b/sync/peerset/peer_set_test.go index 05fc4819d..8880227fd 100644 --- a/sync/peerset/peer_set_test.go +++ b/sync/peerset/peer_set_test.go @@ -75,8 +75,8 @@ func TestPeerSet(t *testing.T) { peerSet.IncreaseReceivedBundlesCounter(pid1) peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeBlocksResponse, 100) peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeTransactions, 150) - peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 200, nil) - peerSet.IncreaseSentBytesCounter(message.TypeBlocksRequest, 250, &pid1) + peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 200, nil) + peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 250, &pid1) peer1 := peerSet.getPeer(pid1) @@ -100,6 +100,7 @@ func TestPeerSet(t *testing.T) { assert.Equal(t, peerSet.TotalSentBytes(), int64(450)) assert.Equal(t, peerSet.SentBytesMessageType(message.TypeBlocksRequest), int64(450)) assert.Equal(t, peerSet.SentBytes(), sentBytes) + assert.Equal(t, peerSet.TotalSentBundles(), 2) }) t.Run("Testing UpdateHeight", func(t *testing.T) { diff --git a/sync/sync.go b/sync/sync.go index 8f7c6ca4a..763d42d66 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -147,6 +147,8 @@ func (sync *synchronizer) prepareBundle(msg message.Message) *bundle.Bundle { // It's localnet and for testing purpose only } + bdl.SetSequenceNo(sync.peerSet.TotalSentBundles()) + return bdl } return nil @@ -167,7 +169,7 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID) { } sync.peerSet.UpdateLastSent(to) - sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), &to) + sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), &to) sync.logger.Info("bundle sent", "bundle", bdl, "to", to) } } @@ -194,7 +196,7 @@ func (sync *synchronizer) broadcast(msg message.Message) { } else { sync.logger.Info("broadcasting new bundle", "bundle", bdl) } - sync.peerSet.IncreaseSentBytesCounter(msg.Type(), int64(len(data)), nil) + sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), nil) } } diff --git a/sync/sync_test.go b/sync/sync_test.go index 86d42b002..1063d5248 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -373,20 +373,35 @@ func TestBroadcastBlockAnnounce(t *testing.T) { t.Run("Should announce the block", func(t *testing.T) { blk, cert := td.GenerateTestBlock(td.RandHeight()) - msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert} + msg := message.NewBlockAnnounceMessage(blk, cert) - td.broadcastCh <- &msg + td.sync.broadcast(msg) td.shouldPublishMessageWithThisType(t, message.TypeBlockAnnounce) }) t.Run("Should NOT announce the block", func(t *testing.T) { blk, cert := td.GenerateTestBlock(td.RandHeight()) - msg := message.BlockAnnounceMessage{Block: blk, Certificate: cert} + msg := message.NewBlockAnnounceMessage(blk, cert) td.sync.cache.AddBlock(blk) - td.broadcastCh <- &msg + td.sync.broadcast(msg) td.shouldNotPublishMessageWithThisType(t, message.TypeBlockAnnounce) }) } + +func TestBundleSequenceNo(t *testing.T) { + td := setup(t, nil) + + msg := message.NewQueryProposalMessage(td.RandHeight(), td.RandValAddress()) + + td.sync.broadcast(msg) + bdl1 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal) + assert.Equal(t, 0, bdl1.SequenceNo) + + // Sending the same message again + td.sync.broadcast(msg) + bdl2 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal) + assert.Equal(t, 1, bdl2.SequenceNo) +}