Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] add extension in eth protocol handshake to disable tx broadcast #412

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
Expand All @@ -44,7 +46,6 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
lru "github.com/hashicorp/golang-lru"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fix the imports

)

var (
Expand Down
23 changes: 12 additions & 11 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}

if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
DirectBroadcast: config.DirectBroadcast,
DiffSync: config.DiffSync,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Checkpoint: checkpoint,
Whitelist: config.Whitelist,
DirectBroadcast: config.DirectBroadcast,
DiffSync: config.DiffSync,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
}); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
Expand All @@ -471,7 +471,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
Expand All @@ -485,7 +485,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
Expand All @@ -499,7 +499,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
return ps.idlePeers(eth.ETH65, eth.ETH67, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
Expand Down
5 changes: 3 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ type Config struct {
Genesis *core.Genesis `toml:",omitempty"`

// Protocol options
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
DisablePeerTxBroadcast bool

// This can be set to list of enrtree:// URLs which will be queried for
// for nodes to connect to.
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 29 additions & 26 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,24 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
DiffSync bool // Whether to diff sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
DirectBroadcast bool
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
DiffSync bool // Whether to diff sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
DirectBroadcast bool
DisablePeerTxBroadcast bool
}

type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
Expand Down Expand Up @@ -138,18 +140,19 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
Expand Down Expand Up @@ -276,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -333,7 +333,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -532,7 +532,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Connect a new peer and check that we receive the checkpoint challenge
Expand Down Expand Up @@ -616,7 +616,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -689,7 +689,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (p *Peer) broadcastTransactions() {
case <-fail:
failed = true

case <-p.txTerm:
return

case <-p.term:
return
}
Expand Down Expand Up @@ -189,6 +192,9 @@ func (p *Peer) announceTransactions() {
case <-fail:
failed = true

case <-p.txTerm:
return

case <-p.term:
return
}
Expand Down
62 changes: 61 additions & 1 deletion eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand Down Expand Up @@ -68,6 +68,49 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
}
p.td, p.head = status.TD, status.Head

if p.version >= ETH67 {
var upgradeStatus UpgradeStatusPacket // safe to read after two values have been received from errc
if extension == nil {
extension = &UpgradeStatusExtension{}
}
extensionRaw, err := extension.Encode()
if err != nil {
return err
}

gopool.Submit(func() {
errc <- p2p.Send(p.rw, UpgradeStatusMsg, &UpgradeStatusPacket{
Extension: extensionRaw,
})
})
gopool.Submit(func() {
errc <- p.readUpgradeStatus(&upgradeStatus)
})
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
for i := 0; i < 2; i++ {
select {
case err := <-errc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need make sure errc is clear before read from it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the errc is not empty, the upper code should complain

if err != nil {
return err
}
case <-timeout.C:
return p2p.DiscReadTimeout
}
}

extension, err := upgradeStatus.GetExtension()
if err != nil {
return err
}
p.statusExtension = extension

if p.statusExtension.DisablePeerTxBroadcast {
p.Log().Debug("peer does not need broadcast txs, closing broadcast routines")
p.CloseTxBroadcast()
}
}

// TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
// larger, it will still fit within 100 bits
if tdlen := p.td.BitLen(); tdlen > 100 {
Expand Down Expand Up @@ -106,3 +149,20 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
}
return nil
}

func (p *Peer) readUpgradeStatus(status *UpgradeStatusPacket) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Code != UpgradeStatusMsg {
return fmt.Errorf("%w: upgrade status msg has code %x (!= %x)", errNoStatusMsg, msg.Code, UpgradeStatusMsg)
}
if msg.Size > maxMessageSize {
return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
}
if err := msg.Decode(&status); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return nil
}
2 changes: 1 addition & 1 deletion eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain))
err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
Expand Down
Loading