diff --git a/dot/network/helpers_test.go b/dot/network/helpers_test.go index dc374d6e50..e063c82717 100644 --- a/dot/network/helpers_test.go +++ b/dot/network/helpers_test.go @@ -44,6 +44,7 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { msgs := s.messages[stream.Conn().RemotePeer()] s.messages[stream.Conn().RemotePeer()] = append(msgs, msg) + announceHandshake := &BlockAnnounceHandshake{ BestBlockNumber: 0, } diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 281006265f..a3b0006557 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -289,7 +289,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc } if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) { - // message has already been sent + logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg) return } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index 8e0bb1d0ee..afb633a7e8 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -267,13 +267,20 @@ func Test_HandshakeTimeout(t *testing.T) { } require.NoError(t, err) + // clear handshake data from connection handler + time.Sleep(time.Millisecond * 100) + info.outboundHandshakeData.Delete(nodeB.host.id()) + connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + for _, stream := range connAToB[0].GetStreams() { + _ = stream.Close() + } + testHandshakeMsg := &BlockAnnounceHandshake{ Roles: 4, BestBlockNumber: 77, BestBlockHash: common.Hash{1}, GenesisHash: common.Hash{2}, } - nodeA.GossipMessage(testHandshakeMsg) info.outboundHandshakeMutexes.Store(nodeB.host.id(), new(sync.Mutex)) go nodeA.sendData(nodeB.host.id(), testHandshakeMsg, info, nil) @@ -285,7 +292,7 @@ func Test_HandshakeTimeout(t *testing.T) { require.False(t, ok) // a stream should be open until timeout - connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) require.Len(t, connAToB, 1) require.Len(t, connAToB[0].GetStreams(), 1) diff --git a/dot/network/service.go b/dot/network/service.go index 25464f4b02..d266aad6dd 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -430,6 +430,33 @@ func (s *Service) sentBlockIntervalTelemetry() { func (s *Service) handleConn(conn libp2pnetwork.Conn) { // TODO: currently we only have one set so setID is 0, change this once we have more set in peerSet. s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer()) + + // exchange BlockAnnounceHandshake with peer so we can start to + // sync if necessary. + prtl, has := s.notificationsProtocols[BlockAnnounceMsgType] + if !has { + return + } + + hs, err := prtl.getHandshake() + if err != nil { + logger.Warnf("failed to get handshake for protocol %s: %s", + prtl.protocolID, + err, + ) + return + } + + _, err = s.sendHandshake(conn.RemotePeer(), hs, prtl) + if err != nil { + logger.Debugf("failed to send handshake to peer %s on connection: %s", + conn.RemotePeer(), + err, + ) + return + } + + // leave stream open if there's no error } // Stop closes running instances of the host and network services as well as diff --git a/dot/network/service_test.go b/dot/network/service_test.go index f7474d3e34..80f5276268 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" ) @@ -95,7 +96,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { Port: availablePort(t), NoBootstrap: true, NoMDNS: true, - LogLvl: 4, + LogLvl: log.Warn, SlotDuration: time.Second, } } @@ -264,6 +265,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) { nodeB := createTestService(t, configB) nodeB.noGossip = true + // TODO: create a decoder that handles both handshakes and messages handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder) nodeB.host.registerStreamHandler(nodeB.host.protocolID+blockAnnounceID, handler.handleStream) @@ -292,14 +294,16 @@ func TestBroadcastDuplicateMessage(t *testing.T) { Digest: types.NewDigest(), } + delete(handler.messages, nodeA.host.id()) + // Only one message will be sent. for i := 0; i < 5; i++ { nodeA.GossipMessage(announceMessage) time.Sleep(time.Millisecond * 10) } - time.Sleep(time.Millisecond * 200) - require.Equal(t, 1, len(handler.messages[nodeA.host.id()])) + time.Sleep(time.Millisecond * 500) + require.Equal(t, 2, len(handler.messages[nodeA.host.id()])) nodeA.host.messageCache = nil @@ -308,7 +312,8 @@ func TestBroadcastDuplicateMessage(t *testing.T) { nodeA.GossipMessage(announceMessage) time.Sleep(time.Millisecond * 10) } - require.Equal(t, 6, len(handler.messages[nodeA.host.id()])) + + require.Equal(t, 7, len(handler.messages[nodeA.host.id()])) } func TestService_NodeRoles(t *testing.T) { diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index 8557bf4721..ac19f469bc 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -152,10 +152,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er } logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash) - - go func(header *types.Header) { - bs.pruneKeyCh <- header - }(&block.Header) + bs.pruneKeyCh <- &block.Header } // if nothing was previously finalised, set the first slot of the network to the @@ -235,8 +232,14 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error { return err } - // the block will be deleted from the unfinalisedBlockMap in the pruning loop - // in `SetFinalisedHash()`, which calls this function + // delete from the unfinalisedBlockMap and delete reference to in-memory trie + block, has = bs.getAndDeleteUnfinalisedBlock(hash) + if !has { + continue + } + + logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash) + bs.pruneKeyCh <- &block.Header } return batch.Flush() diff --git a/dot/state/storage.go b/dot/state/storage.go index 5b71fa4592..5e3c749efa 100644 --- a/dot/state/storage.go +++ b/dot/state/storage.go @@ -84,6 +84,7 @@ func (s *StorageState) SetSyncing(syncing bool) { } func (s *StorageState) pruneKey(keyHeader *types.Header) { + logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash()) s.tries.Delete(keyHeader.StateRoot) }