Skip to content

Commit

Permalink
Revert "eth: drop eth/65, the last non-reqid protocol version" (ether…
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe authored Aug 20, 2021
1 parent 5566e5d commit c368f72
Show file tree
Hide file tree
Showing 15 changed files with 609 additions and 179 deletions.
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) {
if err != nil {
t.Fatalf("could not create new test suite: %v", err)
}
for _, test := range suite.Eth66Tests() {
for _, test := range suite.AllEthTests() {
t.Run(test.Name, func(t *testing.T) {
result := utesting.RunTAP([]utesting.Test{{Name: test.Name, Fn: test.Fn}}, os.Stdout)
if result[0].Failed {
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < eth.ETH66 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66)
if p.version < eth.ETH65 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH65)
}
mode := d.getMode()

Expand Down
95 changes: 93 additions & 2 deletions eth/downloader/downloader_test.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
}

// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
Expand All @@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
}

// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
Expand All @@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
}

// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
Expand All @@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.NodeDataMsg, time.Second)
}
return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
Expand Down
5 changes: 4 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type handler struct {
whitelist map[uint64]common.Hash

// channels for fetcher, syncer, txsyncLoop
txsyncCh chan *txsync
quitSync chan struct{}

chainSync *chainSyncer
Expand All @@ -139,6 +140,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
Expand Down Expand Up @@ -406,8 +408,9 @@ func (h *handler) Start(maxPeers int) {
go h.minedBroadcastLoop()

// start sync handlers
h.wg.Add(1)
h.wg.Add(2)
go h.chainSync.loop()
go h.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}

func (h *handler) Stop() {
Expand Down
38 changes: 17 additions & 21 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {

// Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake.
func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) }
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }

func testForkIDSplit(t *testing.T, protocol uint) {
Expand Down Expand Up @@ -235,6 +236,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
}

// Tests that received transactions are added to the local pool.
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) }
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }

func testRecvTransactions(t *testing.T, protocol uint) {
Expand Down Expand Up @@ -292,6 +294,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
}

// This test checks that pending transactions are sent.
func TestSendTransactions65(t *testing.T) { testSendTransactions(t, eth.ETH65) }
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }

func testSendTransactions(t *testing.T, protocol uint) {
Expand All @@ -303,7 +306,7 @@ func testSendTransactions(t *testing.T, protocol uint) {

insert := make([]*types.Transaction, 100)
for nonce := range insert {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, 10240))
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, txsyncPackSize/10))
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

insert[nonce] = tx
Expand Down Expand Up @@ -377,6 +380,7 @@ func testSendTransactions(t *testing.T, protocol uint) {

// Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals.
func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) }
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }

func testTransactionPropagation(t *testing.T, protocol uint) {
Expand Down Expand Up @@ -517,8 +521,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
defer p2pLocal.Close()
defer p2pRemote.Close()

local := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
defer local.Close()
defer remote.Close()

Expand All @@ -539,39 +543,30 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake")
}

// Connect a new peer and check that we receive the checkpoint challenge.
if checkpoint {
msg, err := p2pRemote.ReadMsg()
if err != nil {
t.Fatalf("failed to read checkpoint challenge: %v", err)
}
request := new(eth.GetBlockHeadersPacket66)
if err := msg.Decode(request); err != nil {
t.Fatalf("failed to decode checkpoint challenge: %v", err)
}
query := request.GetBlockHeadersPacket
if query.Origin.Number != response.Number.Uint64() || query.Amount != 1 || query.Skip != 0 || query.Reverse {
t.Fatalf("challenge mismatch: have [%d, %d, %d, %v] want [%d, %d, %d, %v]",
query.Origin.Number, query.Amount, query.Skip, query.Reverse,
response.Number.Uint64(), 1, 0, false)
if err := remote.ExpectRequestHeadersByNumber(response.Number.Uint64(), 1, 0, false); err != nil {
t.Fatalf("challenge mismatch: %v", err)
}
// Create a block to reply to the challenge if no timeout is simulated.
if !timeout {
if empty {
if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{}); err != nil {
if err := remote.SendBlockHeaders([]*types.Header{}); err != nil {
t.Fatalf("failed to answer challenge: %v", err)
}
} else if match {
if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{response}); err != nil {
if err := remote.SendBlockHeaders([]*types.Header{response}); err != nil {
t.Fatalf("failed to answer challenge: %v", err)
}
} else {
if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{{Number: response.Number}}); err != nil {
if err := remote.SendBlockHeaders([]*types.Header{{Number: response.Number}}); err != nil {
t.Fatalf("failed to answer challenge: %v", err)
}
}
}
}

// Wait until the test timeout passes to ensure proper cleanup
time.Sleep(syncChallengeTimeout + 300*time.Millisecond)

Expand Down Expand Up @@ -624,8 +619,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil)
sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil)
sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil)
defer sourcePeer.Close()
defer sinkPeer.Close()

Expand Down Expand Up @@ -676,6 +671,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {

// Tests that a propagated malformed block (uncles or transactions don't match
// with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) }
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }

func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
Expand Down
47 changes: 32 additions & 15 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,39 @@ type Decoder interface {
Time() time.Time
}

var eth65 = map[uint64]msgHandler{
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetNodeDataMsg: handleGetNodeData,
NodeDataMsg: handleNodeData,
GetReceiptsMsg: handleGetReceipts,
ReceiptsMsg: handleReceipts,
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}

var eth66 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
// eth66 messages with request-id
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
}

// handleMessage is invoked whenever an inbound message is received from a remote
Expand All @@ -201,11 +219,10 @@ func handleMessage(backend Backend, peer *Peer) error {
}
defer msg.Discard()

var handlers = eth66
//if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added
// handlers = eth67
//}

var handlers = eth65
if peer.Version() >= ETH66 {
handlers = eth66
}
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
Expand Down
Loading

0 comments on commit c368f72

Please sign in to comment.