From 9384f18671c6a6e79ba74431ab1bb214861c6fc6 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 2 Nov 2021 14:50:53 +0530 Subject: [PATCH 01/15] Implement time based batch processing of transcation message. --- dot/network/config.go | 5 +++ dot/network/host.go | 14 ++++++ dot/network/notifications.go | 47 +++++++------------- dot/network/notifications_test.go | 66 +++++++++++++++------------- dot/network/service.go | 8 ++-- dot/network/service_test.go | 4 +- dot/network/transaction.go | 71 ++++++++++++++++++++----------- 7 files changed, 123 insertions(+), 92 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index a8162e1ad5..d4acc28a1f 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -52,6 +52,9 @@ const ( // DefaultDiscoveryInterval is the default interval for searching for DHT peers DefaultDiscoveryInterval = time.Minute * 5 + + // defaultTxnBatchSize is the default size for the transaction batch + defaultTxnBatchSize = 100 ) // DefaultBootnodes the default value for Config.Bootnodes @@ -104,6 +107,8 @@ type Config struct { telemetryInterval time.Duration noPreAllocate bool // internal option + + batchSize int // internal option } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/host.go b/dot/network/host.go index c3fcb562ed..f65687b346 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -401,3 +401,17 @@ func (h *host) protocols() []string { func (h *host) closePeer(peer peer.ID) error { return h.h.Network().ClosePeer(peer) } + +func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { + connToPeer := h.h.Network().ConnsToPeer(p) + for _, c := range connToPeer { + for _, st := range c.GetStreams() { + if st.Protocol() == pID { + err := st.Close() + if err != nil { + logger.Trace("Failed to close stream", "protocol", pID, "error", err) + } + } + } + } +} diff --git a/dot/network/notifications.go b/dot/network/notifications.go index c80ed3d23a..4b584926df 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -58,7 +58,7 @@ type ( NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode. - NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error) + NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) ) // BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go @@ -222,46 +222,31 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, "peer", stream.Conn().RemotePeer(), ) - var ( - propagate bool - err error - msgs []*BatchMessage - ) if batchHandler != nil { - msgs, err = batchHandler(peer, msg) - if err != nil { - return err - } + batchHandler(peer, msg) + return nil + } - propagate = len(msgs) > 0 - } else { - propagate, err = messageHandler(peer, msg) - if err != nil { - return err - } - msgs = append(msgs, &BatchMessage{ - msg: msg, - peer: peer, - }) + propagate, err := messageHandler(peer, msg) + if err != nil { + return err } if !propagate || s.noGossip { return nil } - for _, data := range msgs { - seen := s.gossip.hasSeen(data.msg) - if !seen { - s.broadcastExcluding(info, data.peer, data.msg) - } - - // report peer if we get duplicate gossip message. - s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ - Value: peerset.DuplicateGossipValue, - Reason: peerset.DuplicateGossipReason, - }, peer) + seen := s.gossip.hasSeen(msg) + if !seen { + s.broadcastExcluding(info, peer, msg) + return nil } + // report peer if we get duplicate gossip message. + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.DuplicateGossipValue, + Reason: peerset.DuplicateGossipReason, + }, peer) return nil } } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index cfb8803bb4..f66804eeb3 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -25,13 +25,13 @@ import ( "time" "unsafe" - "github.com/ChainSafe/gossamer/dot/types" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/utils" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/utils" ) func TestHandshake_SizeOf(t *testing.T) { @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) { } func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { + const batchSize = 5 basePath := utils.NewTestBasePath(t, "nodeA") - mockhandler := &MockTransactionHandler{} - mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) - mockhandler.On("TransactionsCount").Return(0) config := &Config{ - BasePath: basePath, - Port: 7001, - NoBootstrap: true, - NoMDNS: true, - TransactionHandler: mockhandler, + BasePath: basePath, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + batchSize: batchSize, } - s := createTestService(t, config) - s.batchSize = 5 + srvc1 := createTestService(t, config) configB := &Config{ BasePath: utils.NewTestBasePath(t, "nodeB"), @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { NoMDNS: true, } - b := createTestService(t, configB) + srvc2 := createTestService(t, configB) - txnBatch := make(chan *BatchMessage, s.batchSize) - txnBatchHandler := s.createBatchMessageHandler(txnBatch) - - // don't set handshake data ie. this stream has just been opened - testPeerID := b.host.id() + txnBatch := make(chan *BatchMessage, batchSize) + txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch) // connect nodes - addrInfoB := b.host.addrInfo() - err := s.host.connect(addrInfoB) + addrInfoB := srvc2.host.addrInfo() + err := srvc1.host.connect(addrInfoB) if failedToDial(err) { time.Sleep(TestBackoffTimeout) - err = s.host.connect(addrInfoB) + err = srvc1.host.connect(addrInfoB) + require.NoError(t, err) } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID) + txnProtocolID := srvc1.host.protocolID + transactionsID + stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID) require.NoError(t, err) - require.Len(t, txnBatch, 0) // create info and handler info := ¬ificationsProtocol{ - protocolID: s.host.protocolID + transactionsID, - getHandshake: s.getTransactionHandshake, + protocolID: txnProtocolID, + getHandshake: srvc1.getTransactionHandshake, handshakeValidator: validateTransactionHandshake, inboundHandshakeData: new(sync.Map), outboundHandshakeData: new(sync.Map), } - handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler) + handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler) // set handshake data to received - info.inboundHandshakeData.Store(testPeerID, handshakeData{ + info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{ received: true, validated: true, }) + msg := &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { } err = handler(stream, msg) require.NoError(t, err) - require.Len(t, txnBatch, 0) + require.Len(t, txnBatch, 5) + + // reached batch size limit, below transaction will not be included in batch. + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 5) msg = &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } + // wait for transaction batch channel to process. + time.Sleep(1300 * time.Millisecond) err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 1) diff --git a/dot/network/service.go b/dot/network/service.go index bca1ddb84f..f02630d62d 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -103,8 +103,6 @@ type Service struct { blockResponseBuf []byte blockResponseBufMu sync.Mutex - - batchSize int } // NewService creates a new network service from the configuration and message channels @@ -141,6 +139,9 @@ func NewService(cfg *Config) (*Service, error) { connectToPeersTimeout = cfg.DiscoveryInterval } + if cfg.batchSize == 0 { + cfg.batchSize = defaultTxnBatchSize + } // create a new host instance host, err := newHost(ctx, cfg) if err != nil { @@ -179,7 +180,6 @@ func NewService(cfg *Config) (*Service, error) { bufPool: bufPool, streamManager: newStreamManager(ctx), blockResponseBuf: make([]byte, maxBlockResponseSize), - batchSize: 100, } return network, err @@ -227,7 +227,7 @@ func (s *Service) Start() error { logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err) } - txnBatch := make(chan *BatchMessage, s.batchSize) + txnBatch := make(chan *BatchMessage, s.cfg.batchSize) txnBatchHandler := s.createBatchMessageHandler(txnBatch) // register transactions protocol diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 56a5a85ee4..23c5d31a95 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -66,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service { // helper method to create and start a new network service func createTestService(t *testing.T, cfg *Config) (srvc *Service) { + t.Helper() + if cfg == nil { basePath := utils.NewTestBasePath(t, "node") @@ -84,7 +86,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { if cfg.TransactionHandler == nil { mocktxhandler := &MockTransactionHandler{} - mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil) + mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index f82aa10efb..0687339a6e 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -19,12 +19,13 @@ package network import ( "errors" "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/peer" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/pkg/scale" - - "github.com/libp2p/go-libp2p-core/peer" ) var ( @@ -119,36 +120,54 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { return &transactionHandshake{}, nil } -func (s *Service) createBatchMessageHandler(txnBatch chan *BatchMessage) NotificationsMessageBatchHandler { - return func(peer peer.ID, msg NotificationsMessage) (msgs []*BatchMessage, err error) { +func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { + go func() { + protocolID := s.host.protocolID + transactionsID + ticker := time.NewTicker(1 * time.Second) + + for { + out: + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + innerTicker := time.NewTicker(300 * time.Millisecond) + for { + select { + case <-innerTicker.C: + break out + case txnMsg := <-txnBatchCh: + propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) + if err != nil { + s.host.closeProtocolStream(protocolID, txnMsg.peer) + continue + } + + if s.noGossip || !propagate { + continue + } + + if !s.gossip.hasSeen(txnMsg.msg) { + s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) + } + } + } + } + } + }() + + return func(peer peer.ID, msg NotificationsMessage) { data := &BatchMessage{ msg: msg, peer: peer, } - txnBatch <- data - - if len(txnBatch) < s.batchSize { - return nil, nil - } - var propagateMsgs []*BatchMessage - for txnData := range txnBatch { - propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg) - if err != nil { - continue - } - if propagate { - propagateMsgs = append(propagateMsgs, &BatchMessage{ - msg: txnData.msg, - peer: txnData.peer, - }) - } - if len(txnBatch) == 0 { - break - } + select { + case txnBatchCh <- data: + case <-time.After(time.Millisecond * 200): + logger.Debug("transaction message not included into batch") + return } - // May be use error to compute peer score. - return propagateMsgs, nil } } From 5df9fe6555a8b2adf590068a11f8c0edc69ef11f Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 9 Nov 2021 17:33:39 +0530 Subject: [PATCH 02/15] Address comments. --- dot/network/config.go | 3 +++ dot/network/discovery_test.go | 4 ++-- dot/network/service_test.go | 2 ++ dot/network/transaction.go | 7 +++---- dot/services.go | 7 +++++++ 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index d4acc28a1f..f87d8099bb 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -109,6 +109,9 @@ type Config struct { noPreAllocate bool // internal option batchSize int // internal option + + // Babe slot duration + SlotDuration time.Duration } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/discovery_test.go b/dot/network/discovery_test.go index 6ea389f80a..4822075e47 100644 --- a/dot/network/discovery_test.go +++ b/dot/network/discovery_test.go @@ -22,12 +22,12 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/lib/utils" ) func newTestDiscovery(t *testing.T, num int) []*discovery { diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 23c5d31a95..ef88c987f9 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -91,6 +91,8 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { cfg.TransactionHandler = mocktxhandler } + cfg.SlotDuration = time.Second + cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0" if cfg.LogLvl == 0 { diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 0687339a6e..b5afcd9812 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -123,7 +123,7 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { go func() { protocolID := s.host.protocolID + transactionsID - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(s.cfg.SlotDuration) for { out: @@ -131,10 +131,10 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif case <-s.ctx.Done(): return case <-ticker.C: - innerTicker := time.NewTicker(300 * time.Millisecond) + timeOut := time.NewTimer(s.cfg.SlotDuration / 3) for { select { - case <-innerTicker.C: + case <-timeOut.C: break out case txnMsg := <-txnBatchCh: propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) @@ -166,7 +166,6 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif case txnBatchCh <- data: case <-time.After(time.Millisecond * 200): logger.Debug("transaction message not included into batch") - return } } } diff --git a/dot/services.go b/dot/services.go index 423c00807c..5aa53bd715 100644 --- a/dot/services.go +++ b/dot/services.go @@ -22,6 +22,7 @@ import ( "path/filepath" "github.com/ChainSafe/chaindb" + "github.com/ChainSafe/gossamer/dot/core" "github.com/ChainSafe/gossamer/dot/digest" "github.com/ChainSafe/gossamer/dot/network" @@ -261,6 +262,11 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi "nomdns", cfg.Network.NoMDNS, ) + slotDuration, err := stateSrvc.Epoch.GetSlotDuration() + if err != nil { + return nil, err + } + // network service configuation networkConfig := network.Config{ LogLvl: cfg.Log.NetworkLvl, @@ -277,6 +283,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi PublishMetrics: cfg.Global.PublishMetrics, PersistentPeers: cfg.Network.PersistentPeers, DiscoveryInterval: cfg.Network.DiscoveryInterval, + SlotDuration: slotDuration, } networkSrvc, err := network.NewService(&networkConfig) From 37af18d963a4e0950209b213339e15e870327532 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Wed, 10 Nov 2021 23:01:33 +0530 Subject: [PATCH 03/15] Fix failing tests. --- dot/rpc/modules/system_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 013675da80..b54a47919a 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -64,6 +64,7 @@ func newNetworkService(t *testing.T) *network.Service { BasePath: testDir, Syncer: network.NewMockSyncer(), TransactionHandler: network.NewMockTransactionHandler(), + SlotDuration: time.Second, } srv, err := network.NewService(cfg) From e90aed82a4c746db087bbaa32c696540ed1264b5 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Wed, 10 Nov 2021 23:04:57 +0530 Subject: [PATCH 04/15] Address comments. --- dot/network/transaction.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index b5afcd9812..a1d0fb64ce 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -165,7 +165,7 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif select { case txnBatchCh <- data: case <-time.After(time.Millisecond * 200): - logger.Debug("transaction message not included into batch") + logger.Debug("transaction message not included into batch", "peer", peer.String(), "msg", msg.String()) } } } From 14e65632eedef68604ce1229cdbc103f75ab27b7 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 11 Nov 2021 22:20:44 +0530 Subject: [PATCH 05/15] Address comments. --- dot/network/transaction.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index a1d0fb64ce..b24a41e4b9 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -124,6 +124,7 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif go func() { protocolID := s.host.protocolID + transactionsID ticker := time.NewTicker(s.cfg.SlotDuration) + defer ticker.Stop() for { out: From bbe1b890a50c5e35575d363d79d6ccf9c71a799d Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 12 Nov 2021 18:01:54 +0530 Subject: [PATCH 06/15] Fix build --- dot/network/host.go | 2 +- dot/network/transaction.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/host.go b/dot/network/host.go index fd364d1aee..e1ae32f497 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -402,7 +402,7 @@ func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { if st.Protocol() == pID { err := st.Close() if err != nil { - logger.Trace("Failed to close stream", "protocol", pID, "error", err) + logger.Tracef("Failed to close stream", "protocol", pID, "error", err) } } } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index b24a41e4b9..0491ca23a2 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -166,7 +166,7 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif select { case txnBatchCh <- data: case <-time.After(time.Millisecond * 200): - logger.Debug("transaction message not included into batch", "peer", peer.String(), "msg", msg.String()) + logger.Debugf("transaction message not included into batch", "peer", peer.String(), "msg", msg.String()) } } } From 61910bcfb5177ea2b045b9d41fcba8dcbdda3797 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 23 Nov 2021 14:55:21 +0530 Subject: [PATCH 07/15] Address comments --- dot/network/config.go | 5 +++-- dot/network/host.go | 11 ++++++----- dot/network/notifications.go | 6 +++--- dot/network/transaction.go | 14 ++++++++++---- dot/rpc/modules/system_test.go | 11 ++++++----- dot/services.go | 2 +- 6 files changed, 29 insertions(+), 20 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index a00fcce65e..52f2e3ce38 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -21,8 +21,9 @@ import ( "path" "time" - "github.com/ChainSafe/gossamer/internal/log" "github.com/libp2p/go-libp2p-core/crypto" + + "github.com/ChainSafe/gossamer/internal/log" ) const ( @@ -110,7 +111,7 @@ type Config struct { batchSize int // internal option - // Babe slot duration + // SlotDuration is slot duration to produce block in milliseconds SlotDuration time.Duration } diff --git a/dot/network/host.go b/dot/network/host.go index e1ae32f497..ca62a6a064 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -399,11 +399,12 @@ func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { connToPeer := h.h.Network().ConnsToPeer(p) for _, c := range connToPeer { for _, st := range c.GetStreams() { - if st.Protocol() == pID { - err := st.Close() - if err != nil { - logger.Tracef("Failed to close stream", "protocol", pID, "error", err) - } + if st.Protocol() != pID { + continue + } + err := st.Close() + if err != nil { + logger.Tracef("Failed to close stream", "protocol", pID, "error", err) } } } diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 52f9fa2479..9e4f1990a0 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -22,10 +22,11 @@ import ( "sync" "time" - "github.com/ChainSafe/gossamer/dot/peerset" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + "github.com/ChainSafe/gossamer/dot/peerset" ) var ( @@ -234,8 +235,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, return nil } - seen := s.gossip.hasSeen(msg) - if !seen { + if !s.gossip.hasSeen(msg) { s.broadcastExcluding(info, peer, msg) return nil } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 0491ca23a2..698b47f8df 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -127,16 +127,17 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif defer ticker.Stop() for { - out: select { case <-s.ctx.Done(): return case <-ticker.C: timeOut := time.NewTimer(s.cfg.SlotDuration / 3) - for { + var completed bool + for !completed { select { case <-timeOut.C: - break out + completed = true + break case txnMsg := <-txnBatchCh: propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) if err != nil { @@ -163,9 +164,14 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif peer: peer, } + timeOut := time.NewTimer(time.Millisecond * 200) + select { case txnBatchCh <- data: - case <-time.After(time.Millisecond * 200): + if !timeOut.Stop() { + <-timeOut.C + } + case <-timeOut.C: logger.Debugf("transaction message not included into batch", "peer", peer.String(), "msg", msg.String()) } } diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 0050c13339..5f4c05e44c 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -25,6 +25,11 @@ import ( "testing" "time" + "github.com/btcsuite/btcutil/base58" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/ChainSafe/gossamer/dot/core" coremocks "github.com/ChainSafe/gossamer/dot/core/mocks" "github.com/ChainSafe/gossamer/dot/network" @@ -40,10 +45,6 @@ import ( "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/pkg/scale" - "github.com/btcsuite/btcutil/base58" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) var ( @@ -63,7 +64,7 @@ func newNetworkService(t *testing.T) *network.Service { BasePath: testDir, Syncer: network.NewMockSyncer(), TransactionHandler: network.NewMockTransactionHandler(), - SlotDuration: time.Second, + SlotDuration: time.Second, } srv, err := network.NewService(cfg) diff --git a/dot/services.go b/dot/services.go index f2845a59a1..636939fda8 100644 --- a/dot/services.go +++ b/dot/services.go @@ -280,7 +280,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi PublishMetrics: cfg.Global.PublishMetrics, PersistentPeers: cfg.Network.PersistentPeers, DiscoveryInterval: cfg.Network.DiscoveryInterval, - SlotDuration: slotDuration, + SlotDuration: slotDuration, } networkSrvc, err := network.NewService(&networkConfig) From 85fc57d19fa09f9cde26d06b4b5f1ffd163fe327 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 23 Nov 2021 14:59:18 +0530 Subject: [PATCH 08/15] Address lint issues --- dot/network/transaction.go | 1 - dot/services.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 250daf15f3..af07226467 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -124,7 +124,6 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif select { case <-timeOut.C: completed = true - break case txnMsg := <-txnBatchCh: propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) if err != nil { diff --git a/dot/services.go b/dot/services.go index 77f20d4375..16bf500c79 100644 --- a/dot/services.go +++ b/dot/services.go @@ -269,7 +269,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi PublishMetrics: cfg.Global.PublishMetrics, PersistentPeers: cfg.Network.PersistentPeers, DiscoveryInterval: cfg.Network.DiscoveryInterval, - SlotDuration: slotDuration, + SlotDuration: slotDuration, } networkSrvc, err := network.NewService(&networkConfig) From 45d69c2824f8f06c3f6d64a44f7fdafcb81f0dc2 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 23 Nov 2021 15:11:23 +0530 Subject: [PATCH 09/15] Minor nit --- dot/network/service.go | 9 +++++---- dot/network/service_test.go | 2 +- dot/rpc/modules/system_test.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 2e363e6c44..ca04ab2abd 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -12,16 +12,17 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/metrics" + libp2pnetwork "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/services" - "github.com/ethereum/go-ethereum/metrics" - libp2pnetwork "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" ) const ( diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 4b508e968f..44dee787d2 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -78,7 +78,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { cfg.TransactionHandler = mocktxhandler } - cfg.SlotDuration = time.Second + cfg.SlotDuration = time.Millisecond * 100 cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0" diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 8e73b31d69..ff0c2bd1bc 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -51,7 +51,7 @@ func newNetworkService(t *testing.T) *network.Service { BasePath: testDir, Syncer: network.NewMockSyncer(), TransactionHandler: network.NewMockTransactionHandler(), - SlotDuration: time.Second, + SlotDuration: time.Second, } srv, err := network.NewService(cfg) From 464f2bccfe4e6ea70bff6c2c414dbc0844bb3df6 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 23 Nov 2021 18:52:11 +0530 Subject: [PATCH 10/15] Increase slot duration time in test --- dot/network/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 44dee787d2..4b508e968f 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -78,7 +78,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { cfg.TransactionHandler = mocktxhandler } - cfg.SlotDuration = time.Millisecond * 100 + cfg.SlotDuration = time.Second cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0" From 07f1a35eb8076b051b74b06e972d5cab3c103600 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Wed, 24 Nov 2021 21:00:14 +0530 Subject: [PATCH 11/15] Address comments --- dot/network/config.go | 3 +- dot/network/host.go | 2 +- dot/network/transaction.go | 79 ++++++++++++++++++++------------------ dot/services.go | 2 +- 4 files changed, 45 insertions(+), 41 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index 721597734d..ba670ecec4 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -41,7 +41,6 @@ const ( // DefaultDiscoveryInterval is the default interval for searching for DHT peers DefaultDiscoveryInterval = time.Minute * 5 - // defaultTxnBatchSize is the default size for the transaction batch defaultTxnBatchSize = 100 ) @@ -98,7 +97,7 @@ type Config struct { batchSize int // internal option - // SlotDuration is slot duration to produce block in milliseconds + // SlotDuration is slot duration to produce block SlotDuration time.Duration } diff --git a/dot/network/host.go b/dot/network/host.go index 143fa4ce0f..2ee40239d2 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -391,7 +391,7 @@ func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { } err := st.Close() if err != nil { - logger.Tracef("Failed to close stream", "protocol", pID, "error", err) + logger.Tracef("Failed to close stream for protocol %s: %s", pID, err) } } } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index af07226467..09134fcd8c 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -20,6 +20,9 @@ var ( _ NotificationsMessage = &transactionHandshake{} ) +// txnBatchChTimeout is the timeout for adding a transaction to the batch processing channel +const txnBatchChTimeout = time.Millisecond * 200 + // TransactionMessage is a network message that is sent to notify of new transactions entering the network type TransactionMessage struct { Extrinsics []types.Extrinsic @@ -107,42 +110,44 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { return &transactionHandshake{}, nil } -func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { - go func() { - protocolID := s.host.protocolID + transactionsID - ticker := time.NewTicker(s.cfg.SlotDuration) - defer ticker.Stop() - - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - timeOut := time.NewTimer(s.cfg.SlotDuration / 3) - var completed bool - for !completed { - select { - case <-timeOut.C: - completed = true - case txnMsg := <-txnBatchCh: - propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) - if err != nil { - s.host.closeProtocolStream(protocolID, txnMsg.peer) - continue - } - - if s.noGossip || !propagate { - continue - } - - if !s.gossip.hasSeen(txnMsg.msg) { - s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) - } +func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage) { + protocolID := s.host.protocolID + transactionsID + ticker := time.NewTicker(s.cfg.SlotDuration) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + timeOut := time.NewTimer(s.cfg.SlotDuration / 3) + var completed bool + for !completed { + select { + case <-timeOut.C: + completed = true + case txnMsg := <-txnBatchCh: + propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) + if err != nil { + s.host.closeProtocolStream(protocolID, txnMsg.peer) + continue + } + + if s.noGossip || !propagate { + continue + } + + if !s.gossip.hasSeen(txnMsg.msg) { + s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) } } } } - }() + } +} + +func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { + go s.startTxnBatchProcessing(txnBatchCh) return func(peer peer.ID, msg NotificationsMessage) { data := &BatchMessage{ @@ -150,15 +155,15 @@ func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) Notif peer: peer, } - timeOut := time.NewTimer(time.Millisecond * 200) + timer := time.NewTimer(txnBatchChTimeout) select { case txnBatchCh <- data: - if !timeOut.Stop() { - <-timeOut.C + if !timer.Stop() { + <-timer.C } - case <-timeOut.C: - logger.Debugf("transaction message not included into batch", "peer", peer.String(), "msg", msg.String()) + case <-timer.C: + logger.Debugf("transaction message %s for peer %s not included into batch", msg, peer) } } } diff --git a/dot/services.go b/dot/services.go index 16bf500c79..97bfedf5c7 100644 --- a/dot/services.go +++ b/dot/services.go @@ -250,7 +250,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi slotDuration, err := stateSrvc.Epoch.GetSlotDuration() if err != nil { - return nil, err + return nil, fmt.Errorf("cannot get slot duration: %w", err) } // network service configuation From 3dd07abf09cf5f05e955fbd2bf26602e454aeb71 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 26 Nov 2021 17:49:24 +0530 Subject: [PATCH 12/15] Address comments. --- dot/network/config.go | 2 +- dot/network/transaction.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index f5a30b0872..b486c5d007 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -99,7 +99,7 @@ type Config struct { batchSize int // internal option - // SlotDuration is slot duration to produce block + // SlotDuration is the slot duration to produce a block SlotDuration time.Duration } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 09134fcd8c..cad849f3d5 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -120,12 +120,12 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage) { case <-s.ctx.Done(): return case <-ticker.C: - timeOut := time.NewTimer(s.cfg.SlotDuration / 3) - var completed bool - for !completed { + timer := time.NewTimer(s.cfg.SlotDuration / 3) + var timedOut bool + for !timedOut { select { - case <-timeOut.C: - completed = true + case <-timer.C: + timedOut = true case txnMsg := <-txnBatchCh: propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) if err != nil { From 993b4235e459e6b46cd367838d9b5b2153b46ebb Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 26 Nov 2021 17:56:30 +0530 Subject: [PATCH 13/15] Fix mock --- dot/network/mock_transaction_handler.go | 14 +++++++------- lib/grandpa/mocks/network.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dot/network/mock_transaction_handler.go b/dot/network/mock_transaction_handler.go index b36752352a..5a290bf6a4 100644 --- a/dot/network/mock_transaction_handler.go +++ b/dot/network/mock_transaction_handler.go @@ -12,20 +12,20 @@ type MockTransactionHandler struct { mock.Mock } -// HandleTransactionMessage provides a mock function with given fields: _a0 -func (_m *MockTransactionHandler) HandleTransactionMessage(_ peer.ID, _a0 *TransactionMessage) (bool, error) { - ret := _m.Called(_a0) +// HandleTransactionMessage provides a mock function with given fields: _a0, _a1 +func (_m *MockTransactionHandler) HandleTransactionMessage(_a0 peer.ID, _a1 *TransactionMessage) (bool, error) { + ret := _m.Called(_a0, _a1) var r0 bool - if rf, ok := ret.Get(0).(func(*TransactionMessage) bool); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(peer.ID, *TransactionMessage) bool); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Get(0).(bool) } var r1 error - if rf, ok := ret.Get(1).(func(*TransactionMessage) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(peer.ID, *TransactionMessage) error); ok { + r1 = rf(_a0, _a1) } else { r1 = ret.Error(1) } diff --git a/lib/grandpa/mocks/network.go b/lib/grandpa/mocks/network.go index 9ca106574c..e33e9f3629 100644 --- a/lib/grandpa/mocks/network.go +++ b/lib/grandpa/mocks/network.go @@ -22,11 +22,11 @@ func (_m *Network) GossipMessage(msg network.NotificationsMessage) { } // RegisterNotificationsProtocol provides a mock function with given fields: sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler -func (_m *Network) RegisterNotificationsProtocol(sub protocol.ID, messageID byte, handshakeGetter func() (network.Handshake, error), handshakeDecoder func([]byte) (network.Handshake, error), handshakeValidator func(peer.ID, network.Handshake) error, messageDecoder func([]byte) (network.NotificationsMessage, error), messageHandler func(peer.ID, network.NotificationsMessage) (bool, error), batchHandler func(peer.ID, network.NotificationsMessage) ([]*network.BatchMessage, error)) error { +func (_m *Network) RegisterNotificationsProtocol(sub protocol.ID, messageID byte, handshakeGetter func() (network.Handshake, error), handshakeDecoder func([]byte) (network.Handshake, error), handshakeValidator func(peer.ID, network.Handshake) error, messageDecoder func([]byte) (network.NotificationsMessage, error), messageHandler func(peer.ID, network.NotificationsMessage) (bool, error), batchHandler func(peer.ID, network.NotificationsMessage)) error { ret := _m.Called(sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler) var r0 error - if rf, ok := ret.Get(0).(func(protocol.ID, byte, func() (network.Handshake, error), func([]byte) (network.Handshake, error), func(peer.ID, network.Handshake) error, func([]byte) (network.NotificationsMessage, error), func(peer.ID, network.NotificationsMessage) (bool, error), func(peer.ID, network.NotificationsMessage) ([]*network.BatchMessage, error)) error); ok { + if rf, ok := ret.Get(0).(func(protocol.ID, byte, func() (network.Handshake, error), func([]byte) (network.Handshake, error), func(peer.ID, network.Handshake) error, func([]byte) (network.NotificationsMessage, error), func(peer.ID, network.NotificationsMessage) (bool, error), func(peer.ID, network.NotificationsMessage)) error); ok { r0 = rf(sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler) } else { r0 = ret.Error(0) From a5dcfdeba3c7fbe8cf1fc5b301bdc42b4a972240 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 26 Nov 2021 18:00:48 +0530 Subject: [PATCH 14/15] Fix mock --- dot/network/service_test.go | 5 ++++- dot/network/test_helpers.go | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 4b508e968f..a33dd8dec9 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -73,7 +73,10 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { if cfg.TransactionHandler == nil { mocktxhandler := &MockTransactionHandler{} - mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) + mocktxhandler.On("HandleTransactionMessage", + mock.AnythingOfType("peer.ID"), + mock.AnythingOfType("*network.TransactionMessage")). + Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index c428aad256..73c2fbcad8 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -8,10 +8,11 @@ import ( "io" "math/big" + "github.com/stretchr/testify/mock" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/common/variadic" - "github.com/stretchr/testify/mock" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -75,7 +76,7 @@ func NewMockTransactionHandler() *MockTransactionHandler { mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("peer.ID"), mock.AnythingOfType("*network.TransactionMessage")). - Return(nil) + Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) return mocktxhandler } From a2971549b631b153abbbe2703843b170f7b91a62 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Fri, 26 Nov 2021 18:02:22 +0530 Subject: [PATCH 15/15] Fix lint --- dot/network/notifications.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 236caac66c..b1b0479f92 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -42,7 +42,8 @@ type ( // NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream. NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) - // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode. + // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications + // stream in batch processing mode. NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) )