Skip to content

Commit

Permalink
fix(dot/core): Batch process transaction message. (ChainSafe#1780)
Browse files Browse the repository at this point in the history
* Batch process transaction message.

Co-authored-by: noot <[email protected]>
  • Loading branch information
arijitAD and noot authored Sep 17, 2021
1 parent 4687cf5 commit 0064836
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 22 deletions.
42 changes: 35 additions & 7 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ type (

// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*batchMessage, err error)
)

type batchMessage struct {
msg NotificationsMessage
peer peer.ID
}

type handshakeReader struct {
hs Handshake
err error
Expand Down Expand Up @@ -141,7 +149,7 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode
}
}

func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler) messageHandler {
func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler, batchHandler NotificationsMessageBatchHandler) messageHandler {
return func(stream libp2pnetwork.Stream, m Message) error {
if m == nil || info == nil || info.handshakeValidator == nil || messageHandler == nil {
return nil
Expand Down Expand Up @@ -210,18 +218,38 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

propagate, err := messageHandler(peer, msg)
if err != nil {
return err
var (
propagate bool
err error
msgs []*batchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &batchMessage{
msg: msg,
peer: peer,
})
}

if !propagate || s.noGossip {
return nil
}

seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}
}

return nil
Expand Down
107 changes: 105 additions & 2 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -143,7 +144,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) {
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand Down Expand Up @@ -316,6 +317,108 @@ func Test_HandshakeTimeout(t *testing.T) {
require.Len(t, connAToB[0].GetStreams(), 0)
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
}

s := createTestService(t, config)
s.batchSize = 5

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
NoBootstrap: true,
NoMDNS: true,
}

b := createTestService(t, configB)

txnBatch := make(chan *batchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
received: true,
validated: true,
})
msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 2)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 3)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 4)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
}

func TestBlockAnnounceHandshakeSize(t *testing.T) {
require.Equal(t, unsafe.Sizeof(BlockAnnounceHandshake{}), reflect.TypeOf(BlockAnnounceHandshake{}).Size())
}
11 changes: 10 additions & 1 deletion dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type Service struct {
// telemetry
telemetryInterval time.Duration
closeCh chan interface{}

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -171,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) {
closeCh: make(chan interface{}),
bufPool: bufPool,
streamManager: newStreamManager(ctx),
batchSize: 100,
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -218,11 +221,15 @@ func (s *Service) Start() error {
s.validateBlockAnnounceHandshake,
decodeBlockAnnounceMessage,
s.handleBlockAnnounceMessage,
nil,
)
if err != nil {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
}

txnBatch := make(chan *batchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
err = s.RegisterNotificationsProtocol(
s.host.protocolID+transactionsID,
Expand All @@ -232,6 +239,7 @@ func (s *Service) Start() error {
validateTransactionHandshake,
decodeTransactionMessage,
s.handleTransactionMessage,
txnBatchHandler,
)
if err != nil {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
Expand Down Expand Up @@ -420,6 +428,7 @@ func (s *Service) RegisterNotificationsProtocol(
handshakeValidator HandshakeValidator,
messageDecoder MessageDecoder,
messageHandler NotificationsMessageHandler,
batchHandler NotificationsMessageBatchHandler,
) error {
s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()
Expand Down Expand Up @@ -462,7 +471,7 @@ func (s *Service) RegisterNotificationsProtocol(
info := s.notificationsProtocols[messageID]

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler)
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler, batchHandler)

s.host.registerStreamHandler(protocolID, func(stream libp2pnetwork.Stream) {
logger.Trace("received stream", "sub-protocol", protocolID)
Expand Down
33 changes: 33 additions & 0 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,39 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
return &transactionHandshake{}, nil
}

func (s *Service) createBatchMessageHandler(txnBatch chan *batchMessage) NotificationsMessageBatchHandler {
return func(peer peer.ID, msg NotificationsMessage) (msgs []*batchMessage, err error) {
data := &batchMessage{
msg: msg,
peer: peer,
}
txnBatch <- data

if len(txnBatch) < s.batchSize {
return nil, nil
}

var propagateMsgs []*batchMessage
for txnData := range txnBatch {
propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg)
if err != nil {
continue
}
if propagate {
propagateMsgs = append(propagateMsgs, &batchMessage{
msg: txnData.msg,
peer: txnData.peer,
})
}
if len(txnBatch) == 0 {
break
}
}
// May be use error to compute peer score.
return propagateMsgs, nil
}
}

func validateTransactionHandshake(_ peer.ID, _ Handshake) error {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sync

import (
"math/big"
"sync"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -56,8 +57,7 @@ type StorageState interface {
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
LoadCodeHash(*common.Hash) (common.Hash, error)
SetSyncing(bool)
Lock()
Unlock()
sync.Locker
}

// CodeSubstitutedState interface to handle storage of code substitute state
Expand Down
4 changes: 2 additions & 2 deletions lib/babe/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package babe

import (
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
Expand Down Expand Up @@ -52,8 +53,7 @@ type BlockState interface {
// StorageState interface for storage state methods
type StorageState interface {
TrieState(hash *common.Hash) (*rtstorage.TrieState, error)
Lock()
Unlock()
sync.Locker
}

// TransactionState is the interface for transaction queue methods
Expand Down
1 change: 1 addition & 0 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *Service) registerProtocol() error {
s.validateHandshake,
s.decodeMessage,
s.handleNetworkMessage,
nil,
)
}

Expand Down
17 changes: 9 additions & 8 deletions lib/grandpa/round_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ func (n *testNetwork) SendJustificationRequest(to peer.ID, num uint32) {
}
}

func (n *testNetwork) RegisterNotificationsProtocol(
pid protocol.ID,
messageID byte,
handshakeGetter network.HandshakeGetter,
handshakeDecoder network.HandshakeDecoder,
handshakeValidator network.HandshakeValidator,
messageDecoder network.MessageDecoder,
messageHandler network.NotificationsMessageHandler,
func (*testNetwork) RegisterNotificationsProtocol(
_ protocol.ID,
_ byte,
_ network.HandshakeGetter,
_ network.HandshakeDecoder,
_ network.HandshakeValidator,
_ network.MessageDecoder,
_ network.NotificationsMessageHandler,
_ network.NotificationsMessageBatchHandler,
) error {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions lib/grandpa/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,6 @@ type Network interface {
handshakeValidator network.HandshakeValidator,
messageDecoder network.MessageDecoder,
messageHandler network.NotificationsMessageHandler,
batchHandler network.NotificationsMessageBatchHandler,
) error
}

0 comments on commit 0064836

Please sign in to comment.