Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): Implement time based handle transaction #1942

Merged
merged 18 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

// defaultTxnBatchSize is the default size for the transaction batch
defaultTxnBatchSize = 100
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -104,6 +107,11 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option

// Babe slot duration
SlotDuration time.Duration
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
4 changes: 2 additions & 2 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/lib/utils"
)

func newTestDiscovery(t *testing.T, num int) []*discovery {
Expand Down
14 changes: 14 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,17 @@ func (h *host) protocols() []string {
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() == pID {
err := st.Close()
if err != nil {
logger.Tracef("Failed to close stream", "protocol", pID, "error", err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
47 changes: 16 additions & 31 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for me to gain understanding, but what is the distinction here between a handshake and a non-handshake message when in batch processing mode, and why do they have to be handled differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a notifications stream is opened (see docs on the different stream types here https://docs.rs/sc-network/0.9.0/sc_network/) the first message each side sends to each other over it is a "handshake" message, which each side validates and if they don't like it they close the stream, otherwise it stays open and the rest of the messages that are sent over the stream (by the opening side only) are the normal messages for that stream protocol, so in this case TransactionMessages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay that makes sense. I wasn't sure if handshake meant something different in this context, but this sounds like what I would think of as a typical handshake for establishing connection between peers (correct me if I'm misunderstanding though). And to clarify, the streams are one sided in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, After handshake the steam is unidirectional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's not exactly the same as a handshake for estabilishing connection/public keys, but it's a more high-level stream level handshake

NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error)
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -220,46 +220,31 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
logger.Tracef("received message on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
seen := s.gossip.hasSeen(msg)
if !seen {
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}
}
Expand Down
66 changes: 36 additions & 30 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestHandshake_SizeOf(t *testing.T) {
Expand Down Expand Up @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
8 changes: 4 additions & 4 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -138,6 +136,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -176,7 +177,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *Service) Start() error {
blockAnnounceID, err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
6 changes: 5 additions & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -84,11 +86,13 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}

cfg.SlotDuration = time.Second
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0"

if cfg.LogLvl == 0 {
Expand Down
Loading