diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1772913c0e..fc6848c9ee 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -754,6 +754,13 @@ var ( Usage: "Gas price below which gpo will ignore transactions", Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(), } + // flag to set the transaction fetcher's txArrivalWait value, which is the maximum waiting + // period the fetcher will wait to receive an announced tx before explicitly requesting it + TxArrivalWaitFlag = cli.DurationFlag{ + Name: "txarrivalwait", + Usage: "Maximum duration to wait for a transaction before requesting it (defaults to 500ms)", + Value: node.DefaultConfig.P2P.TxArrivalWait, + } // Metrics flags MetricsEnabledFlag = cli.BoolFlag{ @@ -1288,6 +1295,10 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) { cfg.NoDiscovery = true cfg.DiscoveryV5 = false } + + if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) { + cfg.TxArrivalWait = TxArrivalWaitFlag.Value + } } // SetNodeConfig applies node-related command line flags to the config. diff --git a/docs/cli/example_config.toml b/docs/cli/example_config.toml index d202a1cf82..bffbd77631 100644 --- a/docs/cli/example_config.toml +++ b/docs/cli/example_config.toml @@ -29,15 +29,16 @@ devfakeauthor = false # Run miner without validator set authorization debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort} [p2p] - maxpeers = 50 # Maximum number of network peers (network disabled if set to 0) - maxpendpeers = 50 # Maximum number of pending connection attempts - bind = "0.0.0.0" # Network binding address - port = 30303 # Network listening port - nodiscover = false # Disables the peer discovery mechanism (manual peer addition) - nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:) + maxpeers = 50 # Maximum number of network peers (network disabled if set to 0) + maxpendpeers = 50 # Maximum number of pending connection attempts + bind = "0.0.0.0" # Network binding address + port = 30303 # Network listening port + nodiscover = false # Disables the peer discovery mechanism (manual peer addition) + nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:) netrestrict = "" # Restricts network communication to the given IP networks (CIDR masks) nodekey = "" # P2P node key file nodekeyhex = "" # P2P node key as hex + txarrivalwait = "500ms" # Maximum duration to wait before requesting an announced transaction [p2p.discovery] v5disc = false # Enables the experimental RLPx V5 (Topic Discovery) mechanism bootnodes = [] # Comma separated enode URLs for P2P discovery bootstrap diff --git a/docs/cli/server.md b/docs/cli/server.md index 1c5d85d1a1..2f049ec210 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -216,6 +216,8 @@ The ```bor server``` command runs the Bor client. - ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false) +- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500ms) + ### Sealer Options - ```mine```: Enable mining (default: false) diff --git a/eth/backend.go b/eth/backend.go index 869566a7ac..fa5d40780d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -268,6 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EthAPI: ethAPI, PeerRequiredBlocks: config.PeerRequiredBlocks, checker: checker, + txArrivalWait: eth.p2pServer.TxArrivalWait, }); err != nil { return nil, err } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index b10c0db9ee..8bdb1547e8 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -53,13 +53,13 @@ const ( // re-request them. maxTxUnderpricedSetSize = 32768 - // txArriveTimeout is the time allowance before an announced transaction is - // explicitly requested. - txArriveTimeout = 500 * time.Millisecond - // txGatherSlack is the interval used to collate almost-expired announces // with network fetches. txGatherSlack = 100 * time.Millisecond + + // maxTxArrivalWait is the longest acceptable duration for the txArrivalWait + // configuration value. Longer config values will default to this. + maxTxArrivalWait = 500 * time.Millisecond ) var ( @@ -176,38 +176,41 @@ type TxFetcher struct { step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Time wrapper to simulate in tests rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random) + + txArrivalWait time.Duration // txArrivalWait is the time allowance before an announced transaction is explicitly requested. } // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { - return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil) +func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, txArrivalWait time.Duration) *TxFetcher { + return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil, txArrivalWait) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. func NewTxFetcherForTests( hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, - clock mclock.Clock, rand *mrand.Rand) *TxFetcher { + clock mclock.Clock, rand *mrand.Rand, txArrivalWait time.Duration) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]struct{}), - announces: make(map[string]map[common.Hash]struct{}), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet(), - hasTx: hasTx, - addTxs: addTxs, - fetchTxs: fetchTxs, - clock: clock, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]struct{}), + announces: make(map[string]map[common.Hash]struct{}), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: mapset.NewSet(), + hasTx: hasTx, + addTxs: addTxs, + fetchTxs: fetchTxs, + clock: clock, + rand: rand, + txArrivalWait: txArrivalWait, } } @@ -333,6 +336,16 @@ func (f *TxFetcher) Drop(peer string) error { // Start boots up the announcement based synchroniser, accepting and processing // hash notifications and block fetches until termination requested. func (f *TxFetcher) Start() { + // the txArrivalWait duration should not be less than the txGatherSlack duration + if f.txArrivalWait < txGatherSlack { + f.txArrivalWait = txGatherSlack + } + + // the txArrivalWait duration should not be greater than the maxTxArrivalWait duration + if f.txArrivalWait > maxTxArrivalWait { + f.txArrivalWait = maxTxArrivalWait + } + go f.loop() } @@ -350,6 +363,9 @@ func (f *TxFetcher) loop() { waitTrigger = make(chan struct{}, 1) timeoutTrigger = make(chan struct{}, 1) ) + + log.Info("TxFetcher", "txArrivalWait", f.txArrivalWait.String()) + for { select { case ann := <-f.notify: @@ -441,7 +457,7 @@ func (f *TxFetcher) loop() { // ones into the retrieval queues actives := make(map[string]struct{}) for hash, instance := range f.waittime { - if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { + if time.Duration(f.clock.Now()-instance)+txGatherSlack > f.txArrivalWait { // Transaction expired without propagation, schedule for retrieval if f.announced[hash] != nil { panic("announce tracker already contains waitlist item") @@ -698,14 +714,16 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { for _, instance := range f.waittime { if earliest > instance { earliest = instance - if txArriveTimeout-time.Duration(now-earliest) < gatherSlack { + if f.txArrivalWait-time.Duration(now-earliest) < gatherSlack { break } } } - *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() { - trigger <- struct{}{} - }) + + *timer = f.clock.AfterFunc( + f.txArrivalWait-time.Duration(now-earliest), + func() { trigger <- struct{}{} }, + ) } // rescheduleTimeout iterates over all the transactions currently in flight and diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 796d4caf0f..b3c3ee3bb7 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -38,7 +38,8 @@ var ( types.NewTransaction(9828766684487745566, common.Address{0xac}, new(big.Int), 0, new(big.Int), nil), } // testTxsHashes is the hashes of the test transactions above - testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()} + testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()} + testTxArrivalWait = 500 * time.Millisecond ) type doTxNotify struct { @@ -81,6 +82,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -113,7 +115,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { // Wait for the arrival timeout which should move all expired items // from the wait list to the scheduler - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -132,7 +134,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { isWaiting(map[string][]common.Hash{ "C": {{0x06}, {0x07}}, }), - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "A": {{0x01}, {0x02}, {0x03}, {0x05}}, @@ -171,6 +173,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -181,7 +184,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -234,6 +237,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -244,7 +248,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -268,7 +272,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { "A": {{0x01}, {0x02}}, }, }, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -314,6 +318,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { <-proceed return errors.New("peer disconnected") }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -324,7 +329,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -383,6 +388,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -393,7 +399,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -422,6 +428,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -432,7 +439,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -460,6 +467,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -470,7 +478,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -506,6 +514,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -516,7 +525,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -544,14 +553,15 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up three transactions to be in different stats, waiting, queued and fetching doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, isWaiting(map[string][]common.Hash{ @@ -592,6 +602,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -600,7 +611,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { "A": {{0x01}}, }), isScheduled{nil, nil, nil}, - doWait{time: txArriveTimeout / 2, step: false}, + doWait{time: testTxArrivalWait / 2, step: false}, isWaiting(map[string][]common.Hash{ "A": {{0x01}}, }), @@ -611,7 +622,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { "A": {{0x01}, {0x02}}, }), isScheduled{nil, nil, nil}, - doWait{time: txArriveTimeout / 2, step: true}, + doWait{time: testTxArrivalWait / 2, step: true}, isWaiting(map[string][]common.Hash{ "A": {{0x02}}, }), @@ -624,7 +635,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { }, }, - doWait{time: txArriveTimeout / 2, step: true}, + doWait{time: testTxArrivalWait / 2, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -649,6 +660,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -659,7 +671,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -681,7 +693,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }, // Ensure that followup announcements don't get scheduled doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "A": {testTxsHashes[1]}, @@ -714,13 +726,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ @@ -733,7 +746,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { "B": {{0x02}}, }, }, - doWait{time: txFetchTimeout - txArriveTimeout, step: true}, + doWait{time: txFetchTimeout - testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "B": {{0x02}}, @@ -745,7 +758,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { "A": {}, }, }, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: nil, fetching: nil, @@ -773,13 +786,14 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Announce all the transactions, wait a bit and ensure only a small // percentage gets requested doTxNotify{peer: "A", hashes: hashes}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -811,13 +825,14 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Announce half of the transaction and wait for them to be scheduled doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2]}, doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1]}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, // Announce the second half and keep them in the wait list doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]}, @@ -878,12 +893,13 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { return errs }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Deliver a transaction through the fetcher, but reject as underpriced doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}, direct: true}, isScheduled{nil, nil, nil}, @@ -921,7 +937,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { steps = append(steps, isWaiting(map[string][]common.Hash{ "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], })) - steps = append(steps, doWait{time: txArriveTimeout, step: true}) + steps = append(steps, doWait{time: testTxArrivalWait, step: true}) steps = append(steps, isScheduled{ tracking: map[string][]common.Hash{ "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], @@ -947,12 +963,13 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { return errs }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: append(steps, []interface{}{ // The preparation of the test has already been done in `steps`, add the last check doTxNotify{peer: "A", hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true}, isUnderpriced(maxTxUnderpricedSetSize), }...), @@ -969,6 +986,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -981,9 +999,9 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, isWaiting(map[string][]common.Hash{ @@ -1022,14 +1040,15 @@ func TestTransactionFetcherDrop(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}}, isWaiting(map[string][]common.Hash{ @@ -1050,7 +1069,7 @@ func TestTransactionFetcherDrop(t *testing.T) { // Push the node into a dangling (timeout) state doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -1088,12 +1107,13 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}}, isWaiting(nil), @@ -1133,12 +1153,13 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more and crash via a timeout @@ -1160,17 +1181,18 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doDrop("A"), doWait{time: txFetchTimeout, step: true}, }, @@ -1189,6 +1211,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -1199,7 +1222,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { // Notify the dangling transaction once more, partially deliver, clash&crash with a timeout doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, doWait{time: txFetchTimeout, step: true}, @@ -1225,17 +1248,18 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { <-proceed return errors.New("peer disconnected") }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doFunc(func() { proceed <- struct{}{} // Allow peer A to return the failure }), diff --git a/eth/handler.go b/eth/handler.go index 48bdf8eb15..b58fab1773 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -93,6 +93,7 @@ type handlerConfig struct { PeerRequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges checker ethereum.ChainValidator + txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it } type handler struct { @@ -307,7 +308,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestTxs(hashes) } - h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx) + h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx, config.txArrivalWait) h.chainSync = newChainSyncer(h) return h, nil } diff --git a/internal/cli/dumpconfig.go b/internal/cli/dumpconfig.go index 55495bf22a..0cd0958ae9 100644 --- a/internal/cli/dumpconfig.go +++ b/internal/cli/dumpconfig.go @@ -66,6 +66,7 @@ func (c *DumpconfigCommand) Run(args []string) int { userConfig.Gpo.IgnorePriceRaw = userConfig.Gpo.IgnorePrice.String() userConfig.Cache.RejournalRaw = userConfig.Cache.Rejournal.String() userConfig.Cache.TrieTimeoutRaw = userConfig.Cache.TrieTimeout.String() + userConfig.P2P.TxArrivalWaitRaw = userConfig.P2P.TxArrivalWait.String() if err := toml.NewEncoder(os.Stdout).Encode(userConfig); err != nil { c.UI.Error(err.Error()) diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 964770e503..5ca9d4eb15 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -204,6 +204,11 @@ type P2PConfig struct { // Discovery has the p2p discovery related settings Discovery *P2PDiscovery `hcl:"discovery,block" toml:"discovery,block"` + + // TxArrivalWait sets the maximum duration the transaction fetcher will wait for + // an announced transaction to arrive before explicitly requesting it + TxArrivalWait time.Duration `hcl:"-,optional" toml:"-"` + TxArrivalWaitRaw string `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` } type P2PDiscovery struct { @@ -583,13 +588,14 @@ func DefaultConfig() *Config { RPCBatchLimit: 100, RPCReturnDataLimit: 100000, P2P: &P2PConfig{ - MaxPeers: 50, - MaxPendPeers: 50, - Bind: "0.0.0.0", - Port: 30303, - NoDiscover: false, - NAT: "any", - NetRestrict: "", + MaxPeers: 50, + MaxPendPeers: 50, + Bind: "0.0.0.0", + Port: 30303, + NoDiscover: false, + NAT: "any", + NetRestrict: "", + TxArrivalWait: 500 * time.Millisecond, Discovery: &P2PDiscovery{ V5Enabled: false, Bootnodes: []string{}, @@ -797,6 +803,7 @@ func (c *Config) fillTimeDurations() error { {"txpool.rejournal", &c.TxPool.Rejournal, &c.TxPool.RejournalRaw}, {"cache.rejournal", &c.Cache.Rejournal, &c.Cache.RejournalRaw}, {"cache.timeout", &c.Cache.TrieTimeout, &c.Cache.TrieTimeoutRaw}, + {"p2p.txarrivalwait", &c.P2P.TxArrivalWait, &c.P2P.TxArrivalWaitRaw}, } for _, x := range tds { @@ -1260,6 +1267,7 @@ func (c *Config) buildNode() (*node.Config, error) { MaxPendingPeers: int(c.P2P.MaxPendPeers), ListenAddr: c.P2P.Bind + ":" + strconv.Itoa(int(c.P2P.Port)), DiscoveryV5: c.P2P.Discovery.V5Enabled, + TxArrivalWait: c.P2P.TxArrivalWait, }, HTTPModules: c.JsonRPC.Http.API, HTTPCors: c.JsonRPC.Http.Cors, diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 7e68f249eb..0c38da3823 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -738,6 +738,13 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.P2P.Discovery.V5Enabled, Group: "P2P", }) + f.DurationFlag(&flagset.DurationFlag{ + Name: "txarrivalwait", + Usage: "Maximum duration to wait for a transaction before explicitly requesting it (defaults to 500ms)", + Value: &c.cliConfig.P2P.TxArrivalWait, + Default: c.cliConfig.P2P.TxArrivalWait, + Group: "P2P", + }) // metrics f.BoolFlag(&flagset.BoolFlag{ diff --git a/node/defaults.go b/node/defaults.go index fd0277e29d..a32fa868ef 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -21,6 +21,7 @@ import ( "os/user" "path/filepath" "runtime" + "time" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/nat" @@ -60,9 +61,10 @@ var DefaultConfig = Config{ WSModules: []string{"net", "web3"}, GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ - ListenAddr: ":30303", - MaxPeers: 50, - NAT: nat.Any(), + ListenAddr: ":30303", + MaxPeers: 50, + NAT: nat.Any(), + TxArrivalWait: 500 * time.Millisecond, }, } diff --git a/p2p/server.go b/p2p/server.go index 7de8504bdc..49ed77bb39 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -156,6 +156,10 @@ type Config struct { Logger log.Logger `toml:",omitempty"` clock mclock.Clock + + // TxArrivalWait is the duration (ms) that the node will wait after seeing + // an announced transaction before explicitly requesting it + TxArrivalWait time.Duration } // Server manages all peer connections. diff --git a/packaging/templates/mainnet-v1/archive/config.toml b/packaging/templates/mainnet-v1/archive/config.toml index 7326ca13e1..b2b6d769bf 100644 --- a/packaging/templates/mainnet-v1/archive/config.toml +++ b/packaging/templates/mainnet-v1/archive/config.toml @@ -31,6 +31,7 @@ gcmode = "archive" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml index 09125d4aff..f2482e0cc6 100644 --- a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml @@ -31,6 +31,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml index 59d0ef9672..08a113b08b 100644 --- a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml @@ -33,6 +33,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml index 00bdca179d..03c34dbea6 100644 --- a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml @@ -33,6 +33,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/archive/config.toml b/packaging/templates/testnet-v4/archive/config.toml index 6b8c13610b..5831169eff 100644 --- a/packaging/templates/testnet-v4/archive/config.toml +++ b/packaging/templates/testnet-v4/archive/config.toml @@ -31,6 +31,7 @@ gcmode = "archive" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml index b9632fe336..c4ca1fa915 100644 --- a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml @@ -31,6 +31,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml index 8dc6daa5ec..b39e0827a5 100644 --- a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml @@ -33,6 +33,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/without-sentry/bor/config.toml b/packaging/templates/testnet-v4/without-sentry/bor/config.toml index 97a9162e09..eb34a264aa 100644 --- a/packaging/templates/testnet-v4/without-sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/without-sentry/bor/config.toml @@ -33,6 +33,7 @@ syncmode = "full" # netrestrict = "" # nodekey = "" # nodekeyhex = "" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/scripts/getconfig.go b/scripts/getconfig.go index 0d44a84016..c609fbb606 100644 --- a/scripts/getconfig.go +++ b/scripts/getconfig.go @@ -172,6 +172,7 @@ var nameTagMap = map[string]string{ "bootnodes": "bootnodes", "maxpeers": "maxpeers", "maxpendpeers": "maxpendpeers", + "txarrivalwait": "txarrivalwait", "nat": "nat", "nodiscover": "nodiscover", "v5disc": "v5disc", diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index d1d6fdc665..fc15e07c7e 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -83,7 +83,7 @@ func Fuzz(input []byte) int { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - clock, rand, + clock, rand, 500*time.Millisecond, ) f.Start() defer f.Stop()