Skip to content

Commit

Permalink
[p2p] Improve p2p (#3409)
Browse files Browse the repository at this point in the history
* update p2p pkg
  • Loading branch information
Liuhaai authored Jul 6, 2022
1 parent 4e397d3 commit 2a70177
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 262 deletions.
3 changes: 3 additions & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ func createServerV2(cfg config.Config, needActPool bool) (*ServerV2, blockchain.
opts := []Option{WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error {
return nil
})}
cfg.API.GRPCPort = testutil.RandomPort()
cfg.API.HTTPPort = 0
cfg.API.WebSocketPort = 0
svr, err := NewServerV2(cfg.API, bc, nil, sf, dao, indexer, bfIndexer, ap, registry, opts...)
if err != nil {
return nil, nil, nil, nil, nil, nil, "", err
Expand Down
72 changes: 53 additions & 19 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ package blocksync
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/iotexproject/iotex-proto/golang/iotexrpc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/config"
Expand All @@ -24,8 +28,12 @@ import (
)

type (
// RequestBlocks send a block request to peers
RequestBlocks func(ctx context.Context, start uint64, end uint64, repeat int)
// Neighbors acquires p2p neighbors in the network
Neighbors func() ([]peer.AddrInfo, error)
// UniCastOutbound sends a unicase message to the peer
UniCastOutbound func(context.Context, peer.AddrInfo, proto.Message) error
// BlockPeer adds the peer into blacklist in p2p layer
BlockPeer func(string)
// TipHeight returns the tip height of blockchain
TipHeight func() uint64
// BlockByHeight returns the block of a given height
Expand All @@ -40,7 +48,7 @@ type (
// TargetHeight returns the target height to sync to
TargetHeight() uint64
// ProcessSyncRequest processes a block sync request
ProcessSyncRequest(context.Context, uint64, uint64, func(context.Context, *block.Block) error) error
ProcessSyncRequest(context.Context, peer.AddrInfo, uint64, uint64) error
// ProcessBlock processes an incoming block
ProcessBlock(context.Context, string, *block.Block) error
// SyncStatus report block sync status
Expand All @@ -57,7 +65,9 @@ type (
tipHeightHandler TipHeight
blockByHeightHandler BlockByHeight
commitBlockHandler CommitBlock
requestBlocksHandler RequestBlocks
p2pNeighbor Neighbors
unicastOutbound UniCastOutbound
blockP2pPeer BlockPeer

syncTask *routine.RecurringTask
syncStageTask *routine.RecurringTask
Expand All @@ -70,8 +80,6 @@ type (
lastTipUpdateTime time.Time
targetHeight uint64 // block number of the highest block header this node has received from peers
mu sync.RWMutex

peerBlockList sync.Map
}

peerBlock struct {
Expand Down Expand Up @@ -104,7 +112,7 @@ func (*dummyBlockSync) TargetHeight() uint64 {
return 0
}

func (*dummyBlockSync) ProcessSyncRequest(context.Context, uint64, uint64, func(context.Context, *block.Block) error) error {
func (*dummyBlockSync) ProcessSyncRequest(context.Context, peer.AddrInfo, uint64, uint64) error {
return nil
}

Expand All @@ -122,7 +130,9 @@ func NewBlockSyncer(
tipHeightHandler TipHeight,
blockByHeightHandler BlockByHeight,
commitBlockHandler CommitBlock,
requestBlocksHandler RequestBlocks,
p2pNeighbor Neighbors,
uniCastHandler UniCastOutbound,
blockP2pPeer BlockPeer,
) (BlockSync, error) {
bs := &blockSyncer{
cfg: cfg,
Expand All @@ -131,7 +141,9 @@ func NewBlockSyncer(
tipHeightHandler: tipHeightHandler,
blockByHeightHandler: blockByHeightHandler,
commitBlockHandler: commitBlockHandler,
requestBlocksHandler: requestBlocksHandler,
p2pNeighbor: p2pNeighbor,
unicastOutbound: uniCastHandler,
blockP2pPeer: blockP2pPeer,
targetHeight: 0,
}
if bs.cfg.Interval != 0 {
Expand All @@ -151,8 +163,7 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool {
if err == nil {
return true
}
bs.peerBlockList.Store(blk.pid, true)

bs.blockP2pPeer(blk.pid)
log.L().Error("failed to commit block", zap.Error(err), zap.Uint64("height", blk.block.Height()), zap.String("peer", blk.pid))
}
return false
Expand Down Expand Up @@ -181,7 +192,35 @@ func (bs *blockSyncer) sync() {
zap.Any("intervals", intervals),
zap.Uint64("targetHeight", targetHeight))
for i, interval := range intervals {
bs.requestBlocksHandler(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep)
bs.requestBlock(context.Background(), interval.Start, interval.End, bs.cfg.MaxRepeat-i/bs.cfg.RepeatDecayStep)
}
}

func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint64, repeat int) {
peers, err := bs.p2pNeighbor()
if err != nil {
log.L().Error("failed to get neighbours", zap.Error(err))
return
}
if len(peers) == 0 {
log.L().Error("no peers")
return
}
if repeat < 2 {
repeat = 2
}
if repeat > len(peers) {
repeat = len(peers)
}
for i := 0; i < repeat; i++ {
peer := peers[rand.Intn(len(peers))]
if err := bs.unicastOutbound(
ctx,
peer,
&iotexrpc.BlockSync{Start: start, End: end},
); err != nil {
log.L().Error("failed to request blocks", zap.Error(err), zap.String("peer", peer.ID.Pretty()), zap.Uint64("start", start), zap.Uint64("end", end))
}
}
}

Expand Down Expand Up @@ -226,11 +265,6 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
return errors.New("block is nil")
}

if _, ok := bs.peerBlockList.Load(peer); ok {
log.L().Info("peer in block list.")
return nil
}

tip := bs.tipHeightHandler()
added, targetHeight := bs.buf.AddBlock(tip, newPeerBlock(peer, blk))
bs.mu.Lock()
Expand All @@ -257,7 +291,7 @@ func (bs *blockSyncer) ProcessBlock(ctx context.Context, peer string, blk *block
return nil
}

func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, start uint64, end uint64, callback func(context.Context, *block.Block) error) error {
func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, peer peer.AddrInfo, start uint64, end uint64) error {
tip := bs.tipHeightHandler()
if end > tip {
log.L().Debug(
Expand All @@ -277,7 +311,7 @@ func (bs *blockSyncer) ProcessSyncRequest(ctx context.Context, start uint64, end
// TODO: send back multiple blocks in one shot
syncCtx, cancel := context.WithTimeout(ctx, bs.cfg.ProcessSyncRequestTTL)
defer cancel()
if err := callback(syncCtx, blk); err != nil {
if err := bs.unicastOutbound(syncCtx, peer, blk.ConvertToBlockPb()); err != nil {
return err
}
}
Expand Down
135 changes: 32 additions & 103 deletions blocksync/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (

"github.com/golang/mock/gomock"
"github.com/iotexproject/go-pkgs/hash"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/action/protocol"
"github.com/iotexproject/iotex-core/action/protocol/account"
Expand All @@ -38,22 +40,33 @@ import (
)

func newBlockSyncer(cfg config.BlockSync, chain blockchain.Blockchain, dao blockdao.BlockDAO, cs consensus.Consensus) (*blockSyncer, error) {
bs, err := NewBlockSyncer(cfg, chain.TipHeight, func(h uint64) (*block.Block, error) {
return dao.GetBlockByHeight(h)
}, func(blk *block.Block) error {
if err := cs.ValidateBlockFooter(blk); err != nil {
return err
}
if err := chain.ValidateBlock(blk); err != nil {
return err
}
if err := chain.CommitBlock(blk); err != nil {
return err
}
cs.Calibrate(blk.Height())
return nil
}, func(context.Context, uint64, uint64, int) {
})
bs, err := NewBlockSyncer(cfg, chain.TipHeight,
func(h uint64) (*block.Block, error) {
return dao.GetBlockByHeight(h)
},
func(blk *block.Block) error {
if err := cs.ValidateBlockFooter(blk); err != nil {
return err
}
if err := chain.ValidateBlock(blk); err != nil {
return err
}
if err := chain.CommitBlock(blk); err != nil {
return err
}
cs.Calibrate(blk.Height())
return nil
},
func() ([]peer.AddrInfo, error) {
return []peer.AddrInfo{}, nil
},
func(context.Context, peer.AddrInfo, proto.Message) error {
return nil
},
func(string) {
return
},
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,9 +151,7 @@ func TestBlockSyncerProcessSyncRequest(t *testing.T) {

bs, err := newBlockSyncer(cfg.BlockSync, mBc, dao, cs)
assert.NoError(err)
assert.NoError(bs.ProcessSyncRequest(context.Background(), 1, 1, func(context.Context, *block.Block) error {
return nil
}))
assert.NoError(bs.ProcessSyncRequest(context.Background(), peer.AddrInfo{}, 1, 1))
}

func TestBlockSyncerProcessSyncRequestError(t *testing.T) {
Expand All @@ -160,9 +171,7 @@ func TestBlockSyncerProcessSyncRequestError(t *testing.T) {
bs, err := newBlockSyncer(cfg.BlockSync, chain, dao, cs)
require.NoError(err)

require.Error(bs.ProcessSyncRequest(context.Background(), 1, 5, func(context.Context, *block.Block) error {
return nil
}))
require.Error(bs.ProcessSyncRequest(context.Background(), peer.AddrInfo{}, 1, 5))
}

func TestBlockSyncerProcessBlockTipHeight(t *testing.T) {
Expand Down Expand Up @@ -494,93 +503,13 @@ func newTestConfig() (config.Config, error) {
return cfg, nil
}

func TestBlockSyncerPeerBlockList(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

ctx := context.Background()
cfg, err := newTestConfig()
require.NoError(err)
registry := protocol.NewRegistry()
acc := account.NewProtocol(rewarding.DepositGas)
require.NoError(acc.Register(registry))
rp := rolldpos.NewProtocol(cfg.Genesis.NumCandidateDelegates, cfg.Genesis.NumDelegates, cfg.Genesis.NumSubEpochs)
require.NoError(rp.Register(registry))
sf, err := factory.NewFactory(cfg, factory.InMemTrieOption(), factory.RegistryOption(registry))
require.NoError(err)
ap, err := actpool.NewActPool(cfg.Genesis, sf, cfg.ActPool, actpool.EnableExperimentalActions())
require.NotNil(ap)
require.NoError(err)
ap.AddActionEnvelopeValidators(protocol.NewGenericValidator(sf, accountutil.AccountState))
dao := blockdao.NewBlockDAOInMemForTest([]blockdao.BlockIndexer{sf})
chain := blockchain.NewBlockchain(
cfg,
dao,
factory.NewMinter(sf, ap),
blockchain.BlockValidatorOption(block.NewValidator(sf, ap)),
)
require.NotNil(chain)
require.NoError(chain.Start(ctx))
cs := mock_consensus.NewMockConsensus(ctrl)
cs.EXPECT().ValidateBlockFooter(gomock.Any()).Return(nil).Times(3)
cs.EXPECT().Calibrate(gomock.Any()).Times(2)

bs, err := newBlockSyncer(cfg.BlockSync, chain, dao, cs)
require.NoError(err)

defer func() {
require.NoError(chain.Stop(ctx))
}()

ctx, err = chain.Context(ctx)
require.NoError(err)

h := chain.TipHeight()
blk1, err := chain.MintNewBlock(testutil.TimestampNow())
require.NotNil(blk1)
require.NoError(err)

blk2 := block.NewBlockDeprecated(
uint32(123),
blk1.Height(),
hash.Hash256{},
testutil.TimestampNow(),
identityset.PrivateKey(27).PublicKey(),
nil,
)

require.NoError(bs.ProcessBlock(ctx, "peer1", blk2))
require.NoError(bs.ProcessBlock(ctx, "peer2", blk1))

h2 := chain.TipHeight()
assert.Equal(t, h+1, h2)

blk3, err := chain.MintNewBlock(testutil.TimestampNow())
require.NotNil(blk3)
require.NoError(err)

require.NoError(bs.ProcessBlock(ctx, "peer1", blk3))

h3 := chain.TipHeight()
assert.Equal(t, h2, h3)

blk4, err := chain.MintNewBlock(testutil.TimestampNow())
require.NotNil(blk4)
require.NoError(err)

require.NoError(bs.ProcessBlock(ctx, "peer2", blk4))

h4 := chain.TipHeight()
assert.Equal(t, h2+1, h4)
}

func TestDummyBlockSync(t *testing.T) {
require := require.New(t)
bs := NewDummyBlockSyncer()
require.NoError(bs.Start(nil))
require.NoError(bs.Stop(nil))
require.NoError(bs.ProcessBlock(nil, "", nil))
require.NoError(bs.ProcessSyncRequest(nil, 0, 0, nil))
require.NoError(bs.ProcessSyncRequest(nil, peer.AddrInfo{}, 0, 0))
require.Equal(bs.TargetHeight(), uint64(0))
startingHeight, currentHeight, targetHeight, desc := bs.SyncStatus()
require.Zero(startingHeight)
Expand Down
2 changes: 1 addition & 1 deletion blocksync/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (b *blockBuffer) AddBlock(tipHeight uint64, blk *peerBlock) (bool, uint64)
defer b.mu.Unlock()
blkHeight := blk.block.Height()
if blkHeight <= tipHeight {
return false, 0
return false, blkHeight
}
if blkHeight > tipHeight+b.bufferSize {
return false, tipHeight + b.bufferSize
Expand Down
Loading

0 comments on commit 2a70177

Please sign in to comment.