Skip to content

Commit

Permalink
Merge branch 'development' into jimmy/integrateScaleGenesis
Browse files Browse the repository at this point in the history
  • Loading branch information
jimjbrettj committed Jul 7, 2021
2 parents 0d21269 + 3350232 commit b2c59cd
Show file tree
Hide file tree
Showing 34 changed files with 1,427 additions and 727 deletions.
10 changes: 5 additions & 5 deletions dot/digest/digest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ func TestHandler_GrandpaScheduledChange(t *testing.T) {
require.NoError(t, err)

headers := addTestBlocksToState(t, 2, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
for i, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), uint64(i), 0)
}

// authorities should change on start of block 3 from start
headers = addTestBlocksToState(t, 1, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 3, 0)
}

time.Sleep(time.Millisecond * 500)
Expand Down Expand Up @@ -231,8 +231,8 @@ func TestHandler_GrandpaPauseAndResume(t *testing.T) {
require.Equal(t, big.NewInt(int64(p.Delay)), nextPause)

headers := addTestBlocksToState(t, 3, handler.blockState)
for _, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), 0, 0)
for i, h := range headers {
handler.blockState.(*state.BlockState).SetFinalizedHash(h.Hash(), uint64(i), 0)
}

time.Sleep(time.Millisecond * 100)
Expand Down
7 changes: 6 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,12 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if !added {
// TODO: ensure grandpa stores *all* previously received votes and discards them
// only when they are for already finalised rounds; currently this causes issues
// because a vote might be received slightly too early, causing a round mismatch err,
// causing grandpa to discard the vote.
_, isConsensusMsg := msg.(*ConsensusMessage)
if !added && !isConsensusMsg {
return
}
}
Expand Down
49 changes: 28 additions & 21 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// SendBlockReqestByHash sends a block request to the network with the given block hash
func (s *Service) SendBlockReqestByHash(hash common.Hash) {
req := createBlockRequestWithHash(hash, blockRequestSize)
s.syncQueue.requestDataByHash.Delete(hash)
s.syncQueue.trySync(&syncRequest{
req: req,
to: "",
})
}

// handleSyncStream handles streams with the <protocol-id>/sync/2 protocol ID
func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
Expand Down Expand Up @@ -537,7 +547,11 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
}

q.responses = sortResponses(q.responses)
logger.Debug("pushed block data to queue", "start", start, "end", end, "queue", q.stringifyResponseQueue())
logger.Debug("pushed block data to queue", "start", start, "end", end,
"start hash", q.responses[0].Hash,
"end hash", q.responses[len(q.responses)-1].Hash,
"queue", q.stringifyResponseQueue(),
)
return nil
}

Expand Down Expand Up @@ -611,9 +625,10 @@ func (q *syncQueue) trySync(req *syncRequest) {
logger.Trace("trying peers in prioritised order...")
syncPeers := q.getSortedPeers()

for _, peer := range syncPeers {
for i, peer := range syncPeers {
// if peer doesn't respond multiple times, then ignore them TODO: determine best values for this
if peer.score <= badPeerThreshold {
// TODO: if we only have a few peers, should we do this check at all?
if peer.score <= badPeerThreshold && i > q.s.cfg.MinPeers {
break
}

Expand Down Expand Up @@ -647,9 +662,6 @@ func (q *syncQueue) trySync(req *syncRequest) {

q.justificationRequestData.Store(startingBlockHash, reqdata)
}

req.to = ""
q.requestCh <- req
}

func (q *syncQueue) syncWithPeer(peer peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) {
Expand Down Expand Up @@ -737,7 +749,7 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {

end := data[len(data)-1].Number().Int64()
if end <= finalised.Number.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", finalised.Number.Int64())
logger.Debug("ignoring block data that is below our finalised head", "got", end, "head", finalised.Number.Int64())
q.pushRequest(uint64(end+1), blockRequestBufferSize, "")
return
}
Expand Down Expand Up @@ -844,21 +856,16 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

if header.Number.Int64() <= q.goal {
return
if header.Number.Int64() > q.goal {
q.goal = header.Number.Int64()
}

q.goal = header.Number.Int64()

bestNum, err := q.s.blockState.BestBlockNumber()
if err != nil {
logger.Error("failed to get best block number", "error", err)
return
req := createBlockRequestWithHash(header.Hash(), blockRequestSize)
q.requestDataByHash.Delete(req)
q.requestCh <- &syncRequest{
req: req,
to: from,
}

// TODO: if we're at the head, this should request by hash instead of number, since there will
// certainly be blocks with the same number.
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

func createBlockRequest(startInt int64, size uint32) *BlockRequestMessage {
Expand All @@ -875,7 +882,7 @@ func createBlockRequest(startInt int64, size uint32) *BlockRequestMessage {
RequestedData: RequestedDataHeader + RequestedDataBody + RequestedDataJustification,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 0, // ascending
Direction: 0, // TODO: define this somewhere
Max: max,
}

Expand All @@ -896,7 +903,7 @@ func createBlockRequestWithHash(startHash common.Hash, size uint32) *BlockReques
RequestedData: RequestedDataHeader + RequestedDataBody + RequestedDataJustification,
StartingBlock: start,
EndBlockHash: optional.NewHash(false, common.Hash{}),
Direction: 0, // ascending
Direction: 0, // TODO: define this somewhere
Max: max,
}

Expand Down
9 changes: 5 additions & 4 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,12 @@ func TestSyncQueue_HandleBlockAnnounce(t *testing.T) {
require.True(t, ok)
require.Equal(t, 1, score.(int))
require.Equal(t, testBlockAnnounceMessage.Number.Int64(), q.goal)
require.Equal(t, 6, len(q.requestCh))
require.Equal(t, 1, len(q.requestCh))

head, err := q.s.blockState.BestBlockNumber()
require.NoError(t, err)
expected := createBlockRequest(head.Int64(), blockRequestSize)
header := &types.Header{
Number: testBlockAnnounceMessage.Number,
}
expected := createBlockRequestWithHash(header.Hash(), blockRequestSize)
req := <-q.requestCh
require.Equal(t, &syncRequest{req: expected, to: testPeerID}, req)
}
Expand Down
6 changes: 2 additions & 4 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
nodeSrvcs = append(nodeSrvcs, sysSrvc)

// check if rpc service is enabled
if enabled := cfg.RPC.Enabled; enabled {
if enabled := cfg.RPC.Enabled || cfg.RPC.WS; enabled {
rpcSrvc := createRPCService(cfg, stateSrvc, coreSrvc, networkSrvc, bp, rt, sysSrvc)
nodeSrvcs = append(nodeSrvcs, rpcSrvc)
} else {
Expand Down Expand Up @@ -346,9 +346,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}
telemetry.GetInstance().Initialise(!cfg.Global.NoTelemetry)

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
genesisHash := stateSrvc.Block.GenesisHash()
Expand Down
4 changes: 4 additions & 0 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"time"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/rpc/subscription"
Expand Down Expand Up @@ -242,6 +243,9 @@ func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn
CoreAPI: cfg.CoreAPI,
TxStateAPI: cfg.TransactionQueueAPI,
RPCHost: fmt.Sprintf("http://%s:%d/", cfg.Host, cfg.RPCPort),
HTTP: &http.Client{
Timeout: time.Second * 30,
},
}
return c
}
Loading

0 comments on commit b2c59cd

Please sign in to comment.