Skip to content

Commit

Permalink
fix(dot/state): actually prune finalized tries from memory (#2196)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Jan 20, 2022
1 parent 7d946fd commit e4bc375
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 13 deletions.
1 change: 1 addition & 0 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)

announceHandshake := &BlockAnnounceHandshake{
BestBlockNumber: 0,
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) {
// message has already been sent
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
return
}

Expand Down
11 changes: 9 additions & 2 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,20 @@ func Test_HandshakeTimeout(t *testing.T) {
}
require.NoError(t, err)

// clear handshake data from connection handler
time.Sleep(time.Millisecond * 100)
info.outboundHandshakeData.Delete(nodeB.host.id())
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
for _, stream := range connAToB[0].GetStreams() {
_ = stream.Close()
}

testHandshakeMsg := &BlockAnnounceHandshake{
Roles: 4,
BestBlockNumber: 77,
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
}
nodeA.GossipMessage(testHandshakeMsg)

info.outboundHandshakeMutexes.Store(nodeB.host.id(), new(sync.Mutex))
go nodeA.sendData(nodeB.host.id(), testHandshakeMsg, info, nil)
Expand All @@ -285,7 +292,7 @@ func Test_HandshakeTimeout(t *testing.T) {
require.False(t, ok)

// a stream should be open until timeout
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 1)

Expand Down
27 changes: 27 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,33 @@ func (s *Service) sentBlockIntervalTelemetry() {
func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// TODO: currently we only have one set so setID is 0, change this once we have more set in peerSet.
s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer())

// exchange BlockAnnounceHandshake with peer so we can start to
// sync if necessary.
prtl, has := s.notificationsProtocols[BlockAnnounceMsgType]
if !has {
return
}

hs, err := prtl.getHandshake()
if err != nil {
logger.Warnf("failed to get handshake for protocol %s: %s",
prtl.protocolID,
err,
)
return
}

_, err = s.sendHandshake(conn.RemotePeer(), hs, prtl)
if err != nil {
logger.Debugf("failed to send handshake to peer %s on connection: %s",
conn.RemotePeer(),
err,
)
return
}

// leave stream open if there's no error
}

// Stop closes running instances of the host and network services as well as
Expand Down
13 changes: 9 additions & 4 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
LogLvl: 4,
LogLvl: log.Warn,
SlotDuration: time.Second,
}
}
Expand Down Expand Up @@ -264,6 +265,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
nodeB := createTestService(t, configB)
nodeB.noGossip = true

// TODO: create a decoder that handles both handshakes and messages
handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder)
nodeB.host.registerStreamHandler(nodeB.host.protocolID+blockAnnounceID, handler.handleStream)

Expand Down Expand Up @@ -292,14 +294,16 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
Digest: types.NewDigest(),
}

delete(handler.messages, nodeA.host.id())

// Only one message will be sent.
for i := 0; i < 5; i++ {
nodeA.GossipMessage(announceMessage)
time.Sleep(time.Millisecond * 10)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 1, len(handler.messages[nodeA.host.id()]))
time.Sleep(time.Millisecond * 500)
require.Equal(t, 2, len(handler.messages[nodeA.host.id()]))

nodeA.host.messageCache = nil

Expand All @@ -308,7 +312,8 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
nodeA.GossipMessage(announceMessage)
time.Sleep(time.Millisecond * 10)
}
require.Equal(t, 6, len(handler.messages[nodeA.host.id()]))

require.Equal(t, 7, len(handler.messages[nodeA.host.id()]))
}

func TestService_NodeRoles(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
}

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)

go func(header *types.Header) {
bs.pruneKeyCh <- header
}(&block.Header)
bs.pruneKeyCh <- &block.Header
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -235,8 +232,14 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
return err
}

// the block will be deleted from the unfinalisedBlockMap in the pruning loop
// in `SetFinalisedHash()`, which calls this function
// delete from the unfinalisedBlockMap and delete reference to in-memory trie
block, has = bs.getAndDeleteUnfinalisedBlock(hash)
if !has {
continue
}

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

return batch.Flush()
Expand Down
1 change: 1 addition & 0 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *StorageState) SetSyncing(syncing bool) {
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.Delete(keyHeader.StateRoot)
}

Expand Down

0 comments on commit e4bc375

Please sign in to comment.