diff --git a/dot/network/config.go b/dot/network/config.go index 04b717af4f..b486c5d007 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -8,8 +8,9 @@ import ( "path" "time" - "github.com/ChainSafe/gossamer/internal/log" "github.com/libp2p/go-libp2p-core/crypto" + + "github.com/ChainSafe/gossamer/internal/log" ) const ( @@ -39,6 +40,8 @@ const ( // DefaultDiscoveryInterval is the default interval for searching for DHT peers DefaultDiscoveryInterval = time.Minute * 5 + + defaultTxnBatchSize = 100 ) // DefaultBootnodes the default value for Config.Bootnodes @@ -93,6 +96,11 @@ type Config struct { telemetryInterval time.Duration noPreAllocate bool // internal option + + batchSize int // internal option + + // SlotDuration is the slot duration to produce a block + SlotDuration time.Duration } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/discovery_test.go b/dot/network/discovery_test.go index 023cb38b03..7c9bc856e0 100644 --- a/dot/network/discovery_test.go +++ b/dot/network/discovery_test.go @@ -9,12 +9,12 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/lib/utils" ) func newTestDiscovery(t *testing.T, num int) []*discovery { diff --git a/dot/network/host.go b/dot/network/host.go index 4182d825f5..bd659e3e63 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -394,3 +394,18 @@ func (h *host) protocols() []string { func (h *host) closePeer(peer peer.ID) error { return h.h.Network().ClosePeer(peer) } + +func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { + connToPeer := h.h.Network().ConnsToPeer(p) + for _, c := range connToPeer { + for _, st := range c.GetStreams() { + if st.Protocol() != pID { + continue + } + err := st.Close() + if err != nil { + logger.Tracef("Failed to close stream for protocol %s: %s", pID, err) + } + } + } +} diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 313f846500..b1b0479f92 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -10,12 +10,12 @@ import ( "sync" "time" - "github.com/ChainSafe/gossamer/dot/peerset" - "github.com/libp2p/go-libp2p-core/mux" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + "github.com/ChainSafe/gossamer/dot/peerset" ) const handshakeTimeout = time.Second * 10 @@ -42,10 +42,9 @@ type ( // NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream. NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) - // NotificationsMessageBatchHandler is called when a (non-handshake) - // message is received over a notifications stream in batch processing mode. - NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) ( - batchMsgs []*BatchMessage, err error) + // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications + // stream in batch processing mode. + NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) ) // BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go @@ -223,47 +222,30 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, logger.Tracef("received message on notifications sub-protocol %s from peer %s, message is: %s", info.protocolID, stream.Conn().RemotePeer(), msg) - var ( - propagate bool - err error - msgs []*BatchMessage - ) if batchHandler != nil { - msgs, err = batchHandler(peer, msg) - if err != nil { - return err - } - - propagate = len(msgs) > 0 - } else { - propagate, err = messageHandler(peer, msg) - if err != nil { - return err - } + batchHandler(peer, msg) + return nil + } - msgs = append(msgs, &BatchMessage{ - msg: msg, - peer: peer, - }) + propagate, err := messageHandler(peer, msg) + if err != nil { + return err } if !propagate || s.noGossip { return nil } - for _, data := range msgs { - seen := s.gossip.hasSeen(data.msg) - if !seen { - s.broadcastExcluding(info, data.peer, data.msg) - } - - // report peer if we get duplicate gossip message. - s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ - Value: peerset.DuplicateGossipValue, - Reason: peerset.DuplicateGossipReason, - }, peer) + if !s.gossip.hasSeen(msg) { + s.broadcastExcluding(info, peer, msg) + return nil } + // report peer if we get duplicate gossip message. + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.DuplicateGossipValue, + Reason: peerset.DuplicateGossipReason, + }, peer) return nil } } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index 71aeca230b..e0d7b4fc77 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -12,13 +12,13 @@ import ( "time" "unsafe" - "github.com/ChainSafe/gossamer/dot/types" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/utils" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/utils" ) func TestCreateDecoder_BlockAnnounce(t *testing.T) { @@ -302,23 +302,17 @@ func Test_HandshakeTimeout(t *testing.T) { } func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { + const batchSize = 5 basePath := utils.NewTestBasePath(t, "nodeA") - mockhandler := &MockTransactionHandler{} - mockhandler.On("HandleTransactionMessage", - mock.AnythingOfType("peer.ID"), - mock.AnythingOfType("*network.TransactionMessage")). - Return(true, nil) - mockhandler.On("TransactionsCount").Return(0) config := &Config{ - BasePath: basePath, - Port: 7001, - NoBootstrap: true, - NoMDNS: true, - TransactionHandler: mockhandler, + BasePath: basePath, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + batchSize: batchSize, } - s := createTestService(t, config) - s.batchSize = 5 + srvc1 := createTestService(t, config) configB := &Config{ BasePath: utils.NewTestBasePath(t, "nodeB"), @@ -327,42 +321,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { NoMDNS: true, } - b := createTestService(t, configB) + srvc2 := createTestService(t, configB) - txnBatch := make(chan *BatchMessage, s.batchSize) - txnBatchHandler := s.createBatchMessageHandler(txnBatch) - - // don't set handshake data ie. this stream has just been opened - testPeerID := b.host.id() + txnBatch := make(chan *BatchMessage, batchSize) + txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch) // connect nodes - addrInfoB := b.host.addrInfo() - err := s.host.connect(addrInfoB) + addrInfoB := srvc2.host.addrInfo() + err := srvc1.host.connect(addrInfoB) if failedToDial(err) { time.Sleep(TestBackoffTimeout) - err = s.host.connect(addrInfoB) + err = srvc1.host.connect(addrInfoB) + require.NoError(t, err) } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID) + txnProtocolID := srvc1.host.protocolID + transactionsID + stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID) require.NoError(t, err) - require.Len(t, txnBatch, 0) // create info and handler info := ¬ificationsProtocol{ - protocolID: s.host.protocolID + transactionsID, - getHandshake: s.getTransactionHandshake, + protocolID: txnProtocolID, + getHandshake: srvc1.getTransactionHandshake, handshakeValidator: validateTransactionHandshake, inboundHandshakeData: new(sync.Map), outboundHandshakeData: new(sync.Map), } - handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler) + handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler) // set handshake data to received - info.inboundHandshakeData.Store(testPeerID, &handshakeData{ + info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{ received: true, validated: true, }) + msg := &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } @@ -396,11 +389,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { } err = handler(stream, msg) require.NoError(t, err) - require.Len(t, txnBatch, 0) + require.Len(t, txnBatch, 5) + + // reached batch size limit, below transaction will not be included in batch. + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 5) msg = &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } + // wait for transaction batch channel to process. + time.Sleep(1300 * time.Millisecond) err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 1) diff --git a/dot/network/service.go b/dot/network/service.go index defb9bb71a..cdaac53a85 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -12,16 +12,17 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/metrics" + libp2pnetwork "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/services" - "github.com/ethereum/go-ethereum/metrics" - libp2pnetwork "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" ) const ( @@ -89,8 +90,6 @@ type Service struct { blockResponseBuf []byte blockResponseBufMu sync.Mutex - - batchSize int } // NewService creates a new network service from the configuration and message channels @@ -125,6 +124,9 @@ func NewService(cfg *Config) (*Service, error) { connectToPeersTimeout = cfg.DiscoveryInterval } + if cfg.batchSize == 0 { + cfg.batchSize = defaultTxnBatchSize + } // create a new host instance host, err := newHost(ctx, cfg) if err != nil { @@ -162,7 +164,6 @@ func NewService(cfg *Config) (*Service, error) { bufPool: bufPool, streamManager: newStreamManager(ctx), blockResponseBuf: make([]byte, maxBlockResponseSize), - batchSize: 100, } return network, err @@ -211,7 +212,7 @@ func (s *Service) Start() error { blockAnnounceID, err) } - txnBatch := make(chan *BatchMessage, s.batchSize) + txnBatch := make(chan *BatchMessage, s.cfg.batchSize) txnBatchHandler := s.createBatchMessageHandler(txnBatch) // register transactions protocol diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 43c0b7613f..a33dd8dec9 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -53,6 +53,8 @@ func createServiceHelper(t *testing.T, num int) []*Service { // helper method to create and start a new network service func createTestService(t *testing.T, cfg *Config) (srvc *Service) { + t.Helper() + if cfg == nil { basePath := utils.NewTestBasePath(t, "node") @@ -73,12 +75,14 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { mocktxhandler := &MockTransactionHandler{} mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("peer.ID"), - mock.AnythingOfType("*TransactionMessage")). - Return(nil) + mock.AnythingOfType("*network.TransactionMessage")). + Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } + cfg.SlotDuration = time.Second + cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0" if cfg.LogLvl == 0 { diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index c428aad256..73c2fbcad8 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -8,10 +8,11 @@ import ( "io" "math/big" + "github.com/stretchr/testify/mock" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/common/variadic" - "github.com/stretchr/testify/mock" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -75,7 +76,7 @@ func NewMockTransactionHandler() *MockTransactionHandler { mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("peer.ID"), mock.AnythingOfType("*network.TransactionMessage")). - Return(nil) + Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) return mocktxhandler } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 0b64a02bf4..cad849f3d5 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -6,12 +6,13 @@ package network import ( "errors" "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/peer" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/pkg/scale" - - "github.com/libp2p/go-libp2p-core/peer" ) var ( @@ -19,6 +20,9 @@ var ( _ NotificationsMessage = &transactionHandshake{} ) +// txnBatchChTimeout is the timeout for adding a transaction to the batch processing channel +const txnBatchChTimeout = time.Millisecond * 200 + // TransactionMessage is a network message that is sent to notify of new transactions entering the network type TransactionMessage struct { Extrinsics []types.Extrinsic @@ -106,36 +110,61 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { return &transactionHandshake{}, nil } -func (s *Service) createBatchMessageHandler(txnBatch chan *BatchMessage) NotificationsMessageBatchHandler { - return func(peer peer.ID, msg NotificationsMessage) (msgs []*BatchMessage, err error) { +func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage) { + protocolID := s.host.protocolID + transactionsID + ticker := time.NewTicker(s.cfg.SlotDuration) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + timer := time.NewTimer(s.cfg.SlotDuration / 3) + var timedOut bool + for !timedOut { + select { + case <-timer.C: + timedOut = true + case txnMsg := <-txnBatchCh: + propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) + if err != nil { + s.host.closeProtocolStream(protocolID, txnMsg.peer) + continue + } + + if s.noGossip || !propagate { + continue + } + + if !s.gossip.hasSeen(txnMsg.msg) { + s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) + } + } + } + } + } +} + +func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { + go s.startTxnBatchProcessing(txnBatchCh) + + return func(peer peer.ID, msg NotificationsMessage) { data := &BatchMessage{ msg: msg, peer: peer, } - txnBatch <- data - if len(txnBatch) < s.batchSize { - return nil, nil - } + timer := time.NewTimer(txnBatchChTimeout) - var propagateMsgs []*BatchMessage - for txnData := range txnBatch { - propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg) - if err != nil { - continue - } - if propagate { - propagateMsgs = append(propagateMsgs, &BatchMessage{ - msg: txnData.msg, - peer: txnData.peer, - }) - } - if len(txnBatch) == 0 { - break + select { + case txnBatchCh <- data: + if !timer.Stop() { + <-timer.C } + case <-timer.C: + logger.Debugf("transaction message %s for peer %s not included into batch", msg, peer) } - // May be use error to compute peer score. - return propagateMsgs, nil } } diff --git a/dot/network/transaction_test.go b/dot/network/transaction_test.go index 76d79d1b21..49f0cbf4b6 100644 --- a/dot/network/transaction_test.go +++ b/dot/network/transaction_test.go @@ -6,9 +6,10 @@ package network import ( "testing" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/utils" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 5f50327430..ff0c2bd1bc 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -12,6 +12,11 @@ import ( "testing" "time" + "github.com/btcsuite/btcutil/base58" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/ChainSafe/gossamer/dot/core" coremocks "github.com/ChainSafe/gossamer/dot/core/mocks" "github.com/ChainSafe/gossamer/dot/network" @@ -27,10 +32,6 @@ import ( "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/pkg/scale" - "github.com/btcsuite/btcutil/base58" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) var ( @@ -50,6 +51,7 @@ func newNetworkService(t *testing.T) *network.Service { BasePath: testDir, Syncer: network.NewMockSyncer(), TransactionHandler: network.NewMockTransactionHandler(), + SlotDuration: time.Second, } srv, err := network.NewService(cfg) diff --git a/dot/services.go b/dot/services.go index 9b5f8a7698..aacf37f279 100644 --- a/dot/services.go +++ b/dot/services.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/ChainSafe/chaindb" + "github.com/ChainSafe/gossamer/dot/core" "github.com/ChainSafe/gossamer/dot/digest" "github.com/ChainSafe/gossamer/dot/network" @@ -251,6 +252,11 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi cfg.Core.Roles, cfg.Network.Port, strings.Join(cfg.Network.Bootnodes, ","), cfg.Network.ProtocolID, cfg.Network.NoBootstrap, cfg.Network.NoMDNS) + slotDuration, err := stateSrvc.Epoch.GetSlotDuration() + if err != nil { + return nil, fmt.Errorf("cannot get slot duration: %w", err) + } + // network service configuation networkConfig := network.Config{ LogLvl: cfg.Log.NetworkLvl, @@ -267,6 +273,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi PublishMetrics: cfg.Global.PublishMetrics, PersistentPeers: cfg.Network.PersistentPeers, DiscoveryInterval: cfg.Network.DiscoveryInterval, + SlotDuration: slotDuration, PublicIP: cfg.Network.PublicIP, } diff --git a/lib/grandpa/mocks/network.go b/lib/grandpa/mocks/network.go index 9ca106574c..e33e9f3629 100644 --- a/lib/grandpa/mocks/network.go +++ b/lib/grandpa/mocks/network.go @@ -22,11 +22,11 @@ func (_m *Network) GossipMessage(msg network.NotificationsMessage) { } // RegisterNotificationsProtocol provides a mock function with given fields: sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler -func (_m *Network) RegisterNotificationsProtocol(sub protocol.ID, messageID byte, handshakeGetter func() (network.Handshake, error), handshakeDecoder func([]byte) (network.Handshake, error), handshakeValidator func(peer.ID, network.Handshake) error, messageDecoder func([]byte) (network.NotificationsMessage, error), messageHandler func(peer.ID, network.NotificationsMessage) (bool, error), batchHandler func(peer.ID, network.NotificationsMessage) ([]*network.BatchMessage, error)) error { +func (_m *Network) RegisterNotificationsProtocol(sub protocol.ID, messageID byte, handshakeGetter func() (network.Handshake, error), handshakeDecoder func([]byte) (network.Handshake, error), handshakeValidator func(peer.ID, network.Handshake) error, messageDecoder func([]byte) (network.NotificationsMessage, error), messageHandler func(peer.ID, network.NotificationsMessage) (bool, error), batchHandler func(peer.ID, network.NotificationsMessage)) error { ret := _m.Called(sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler) var r0 error - if rf, ok := ret.Get(0).(func(protocol.ID, byte, func() (network.Handshake, error), func([]byte) (network.Handshake, error), func(peer.ID, network.Handshake) error, func([]byte) (network.NotificationsMessage, error), func(peer.ID, network.NotificationsMessage) (bool, error), func(peer.ID, network.NotificationsMessage) ([]*network.BatchMessage, error)) error); ok { + if rf, ok := ret.Get(0).(func(protocol.ID, byte, func() (network.Handshake, error), func([]byte) (network.Handshake, error), func(peer.ID, network.Handshake) error, func([]byte) (network.NotificationsMessage, error), func(peer.ID, network.NotificationsMessage) (bool, error), func(peer.ID, network.NotificationsMessage)) error); ok { r0 = rf(sub, messageID, handshakeGetter, handshakeDecoder, handshakeValidator, messageDecoder, messageHandler, batchHandler) } else { r0 = ret.Error(0) diff --git a/lib/grandpa/state.go b/lib/grandpa/state.go index e002274df0..7ba4087449 100644 --- a/lib/grandpa/state.go +++ b/lib/grandpa/state.go @@ -6,11 +6,12 @@ package grandpa import ( "math/big" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" ) // BlockState is the interface required by GRANDPA into the block state diff --git a/tests/stress/stress_test.go b/tests/stress/stress_test.go index 15e93942e2..9515a9d0ac 100644 --- a/tests/stress/stress_test.go +++ b/tests/stress/stress_test.go @@ -13,15 +13,16 @@ import ( "testing" "time" - gosstypes "github.com/ChainSafe/gossamer/dot/types" - "github.com/ChainSafe/gossamer/internal/log" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/tests/utils" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v3" "github.com/centrifuge/go-substrate-rpc-client/v3/signature" "github.com/centrifuge/go-substrate-rpc-client/v3/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" + + gosstypes "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/tests/utils" ) func TestMain(m *testing.M) {