From f6903b4f5775fa437a648efb131687eb21643e4b Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 4 Feb 2023 18:15:54 -0800 Subject: [PATCH 01/12] added flags, carried flag var through p2p config to backend start func to handler to fetcher. TODO verify default, expose var to api --- cmd/utils/flags.go | 10 ++++ docs/cli/server.md | 2 + eth/backend.go | 1 + eth/fetcher/tx_fetcher.go | 55 +++++++++--------- eth/fetcher/tx_fetcher_test.go | 102 ++++++++++++++++++++------------- eth/handler.go | 3 +- internal/cli/server/config.go | 17 ++++-- internal/cli/server/flags.go | 7 +++ node/defaults.go | 7 ++- p2p/server.go | 4 ++ scripts/getconfig.go | 1 + 11 files changed, 132 insertions(+), 77 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7641f8091f..318a98e017 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -754,6 +754,12 @@ var ( Usage: "Gas price below which gpo will ignore transactions", Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(), } + // fetcher flag to set arrival timeout + TxArrivalWaitFlag = cli.IntFlag{ + Name: "txarrivalwait", + Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 100ms)", + Value: (int)(node.DefaultConfig.P2P.TxArrivalWait), + } // Metrics flags MetricsEnabledFlag = cli.BoolFlag{ @@ -1288,6 +1294,10 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) { cfg.NoDiscovery = true cfg.DiscoveryV5 = false } + + if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) { + cfg.TxArrivalWait = (time.Duration)(TxArrivalWaitFlag.Value) * time.Millisecond + } } // SetNodeConfig applies node-related command line flags to the config. diff --git a/docs/cli/server.md b/docs/cli/server.md index 5bc0ff1024..7ec7251bfa 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -146,6 +146,8 @@ The ```bor server``` command runs the Bor client. - ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false) +- ```txarrivalwait```: Maximum number of milliseconds to wait before requesting an announced transaction (default: 100) + ### Sealer Options - ```mine```: Enable mining (default: false) diff --git a/eth/backend.go b/eth/backend.go index 824fec8914..ad00cfacd2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -266,6 +266,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EthAPI: ethAPI, PeerRequiredBlocks: config.PeerRequiredBlocks, checker: checker, + txArrivalWait: eth.p2pServer.TxArrivalWait, }); err != nil { return nil, err } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 8b97746b14..7b55439011 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -53,10 +53,6 @@ const ( // re-request them. maxTxUnderpricedSetSize = 32768 - // txArriveTimeout is the time allowance before an announced transaction is - // explicitly requested. - txArriveTimeout = 100 * time.Millisecond - // txGatherSlack is the interval used to collate almost-expired announces // with network fetches. txGatherSlack = 20 * time.Millisecond @@ -176,38 +172,41 @@ type TxFetcher struct { step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Time wrapper to simulate in tests rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random) + + txArrivalWait time.Duration // txArrivalWait is the time allowance before an announced transaction is explicitly requested. } // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { - return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil) +func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, txArrivalWait time.Duration) *TxFetcher { + return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil, txArrivalWait) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. func NewTxFetcherForTests( hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, - clock mclock.Clock, rand *mrand.Rand) *TxFetcher { + clock mclock.Clock, rand *mrand.Rand, txArrivalWait time.Duration) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]struct{}), - announces: make(map[string]map[common.Hash]struct{}), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet(), - hasTx: hasTx, - addTxs: addTxs, - fetchTxs: fetchTxs, - clock: clock, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]struct{}), + announces: make(map[string]map[common.Hash]struct{}), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: mapset.NewSet(), + hasTx: hasTx, + addTxs: addTxs, + fetchTxs: fetchTxs, + clock: clock, + rand: rand, + txArrivalWait: txArrivalWait, } } @@ -441,7 +440,7 @@ func (f *TxFetcher) loop() { // ones into the retrieval queues actives := make(map[string]struct{}) for hash, instance := range f.waittime { - if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { + if time.Duration(f.clock.Now()-instance)+txGatherSlack > f.txArrivalWait { // Transaction expired without propagation, schedule for retrieval if f.announced[hash] != nil { panic("announce tracker already contains waitlist item") @@ -698,12 +697,12 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { for _, instance := range f.waittime { if earliest > instance { earliest = instance - if txArriveTimeout-time.Duration(now-earliest) < gatherSlack { + if f.txArrivalWait-time.Duration(now-earliest) < gatherSlack { break } } } - *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() { + *timer = f.clock.AfterFunc(f.txArrivalWait-time.Duration(now-earliest), func() { trigger <- struct{}{} }) } diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 796d4caf0f..37fcc800ef 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -38,7 +38,8 @@ var ( types.NewTransaction(9828766684487745566, common.Address{0xac}, new(big.Int), 0, new(big.Int), nil), } // testTxsHashes is the hashes of the test transactions above - testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()} + testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()} + testTxArrivalWait = 100 * time.Millisecond ) type doTxNotify struct { @@ -81,6 +82,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -113,7 +115,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { // Wait for the arrival timeout which should move all expired items // from the wait list to the scheduler - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -132,7 +134,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { isWaiting(map[string][]common.Hash{ "C": {{0x06}, {0x07}}, }), - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "A": {{0x01}, {0x02}, {0x03}, {0x05}}, @@ -171,6 +173,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -181,7 +184,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -234,6 +237,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -244,7 +248,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -268,7 +272,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { "A": {{0x01}, {0x02}}, }, }, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -314,6 +318,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { <-proceed return errors.New("peer disconnected") }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -324,7 +329,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -383,6 +388,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -393,7 +399,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -422,6 +428,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -432,7 +439,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -460,6 +467,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -470,7 +478,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -506,6 +514,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -516,7 +525,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -544,14 +553,15 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up three transactions to be in different stats, waiting, queued and fetching doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, isWaiting(map[string][]common.Hash{ @@ -592,6 +602,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -600,7 +611,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { "A": {{0x01}}, }), isScheduled{nil, nil, nil}, - doWait{time: txArriveTimeout / 2, step: false}, + doWait{time: testTxArrivalWait / 2, step: false}, isWaiting(map[string][]common.Hash{ "A": {{0x01}}, }), @@ -611,7 +622,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { "A": {{0x01}, {0x02}}, }), isScheduled{nil, nil, nil}, - doWait{time: txArriveTimeout / 2, step: true}, + doWait{time: testTxArrivalWait / 2, step: true}, isWaiting(map[string][]common.Hash{ "A": {{0x02}}, }), @@ -624,7 +635,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { }, }, - doWait{time: txArriveTimeout / 2, step: true}, + doWait{time: testTxArrivalWait / 2, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -649,6 +660,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -659,7 +671,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }), isScheduled{tracking: nil, fetching: nil}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -681,7 +693,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }, // Ensure that followup announcements don't get scheduled doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "A": {testTxsHashes[1]}, @@ -714,13 +726,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ @@ -733,7 +746,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { "B": {{0x02}}, }, }, - doWait{time: txFetchTimeout - txArriveTimeout, step: true}, + doWait{time: txFetchTimeout - testTxArrivalWait, step: true}, isScheduled{ tracking: map[string][]common.Hash{ "B": {{0x02}}, @@ -745,7 +758,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { "A": {}, }, }, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isScheduled{ tracking: nil, fetching: nil, @@ -773,13 +786,14 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Announce all the transactions, wait a bit and ensure only a small // percentage gets requested doTxNotify{peer: "A", hashes: hashes}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -811,13 +825,14 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { func(common.Hash) bool { return false }, nil, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Announce half of the transaction and wait for them to be scheduled doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2]}, doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1]}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, // Announce the second half and keep them in the wait list doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]}, @@ -878,12 +893,13 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { return errs }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Deliver a transaction through the fetcher, but reject as underpriced doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}, direct: true}, isScheduled{nil, nil, nil}, @@ -921,7 +937,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { steps = append(steps, isWaiting(map[string][]common.Hash{ "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], })) - steps = append(steps, doWait{time: txArriveTimeout, step: true}) + steps = append(steps, doWait{time: testTxArrivalWait, step: true}) steps = append(steps, isScheduled{ tracking: map[string][]common.Hash{ "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], @@ -947,12 +963,13 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { return errs }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: append(steps, []interface{}{ // The preparation of the test has already been done in `steps`, add the last check doTxNotify{peer: "A", hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true}, isUnderpriced(maxTxUnderpricedSetSize), }...), @@ -969,6 +986,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -981,9 +999,9 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, isWaiting(map[string][]common.Hash{ @@ -1022,14 +1040,15 @@ func TestTransactionFetcherDrop(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}}, isWaiting(map[string][]common.Hash{ @@ -1050,7 +1069,7 @@ func TestTransactionFetcherDrop(t *testing.T) { // Push the node into a dangling (timeout) state doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, isWaiting(nil), isScheduled{ tracking: map[string][]common.Hash{ @@ -1088,12 +1107,13 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Set up a few hashes into various stages doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}}, isWaiting(nil), @@ -1133,12 +1153,13 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more and crash via a timeout @@ -1160,17 +1181,18 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doDrop("A"), doWait{time: txFetchTimeout, step: true}, }, @@ -1189,6 +1211,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, + testTxArrivalWait, ) }, steps: []interface{}{ @@ -1199,7 +1222,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { // Notify the dangling transaction once more, partially deliver, clash&crash with a timeout doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, doWait{time: txFetchTimeout, step: true}, @@ -1225,17 +1248,18 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { <-proceed return errors.New("peer disconnected") }, + testTxArrivalWait, ) }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, - doWait{time: txArriveTimeout, step: true}, + doWait{time: testTxArrivalWait, step: true}, doFunc(func() { proceed <- struct{}{} // Allow peer A to return the failure }), diff --git a/eth/handler.go b/eth/handler.go index 48bdf8eb15..24f41e017a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -93,6 +93,7 @@ type handlerConfig struct { PeerRequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges checker ethereum.ChainValidator + txArrivalWait time.Duration // Max time in milliseconds to wait for an announced tx before requesting it } type handler struct { @@ -307,7 +308,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestTxs(hashes) } - h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx) + h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx, config.txArrivalWait) h.chainSync = newChainSyncer(h) return h, nil } diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 52461d9306..f75ca56a21 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -131,6 +131,9 @@ type P2PConfig struct { // Discovery has the p2p discovery related settings Discovery *P2PDiscovery `hcl:"discovery,block" toml:"discovery,block"` + + // TxArrivalWait sets the maximum wait for announced transactions + TxArrivalWait uint64 `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` } type P2PDiscovery struct { @@ -449,12 +452,13 @@ func DefaultConfig() *Config { DataDir: DefaultDataDir(), Ancient: "", P2P: &P2PConfig{ - MaxPeers: 50, - MaxPendPeers: 50, - Bind: "0.0.0.0", - Port: 30303, - NoDiscover: false, - NAT: "any", + MaxPeers: 50, + MaxPendPeers: 50, + Bind: "0.0.0.0", + Port: 30303, + NoDiscover: false, + NAT: "any", + TxArrivalWait: 100, Discovery: &P2PDiscovery{ V5Enabled: false, Bootnodes: []string{}, @@ -1047,6 +1051,7 @@ func (c *Config) buildNode() (*node.Config, error) { MaxPendingPeers: int(c.P2P.MaxPendPeers), ListenAddr: c.P2P.Bind + ":" + strconv.Itoa(int(c.P2P.Port)), DiscoveryV5: c.P2P.Discovery.V5Enabled, + TxArrivalWait: time.Duration(c.P2P.TxArrivalWait), }, HTTPModules: c.JsonRPC.Http.API, HTTPCors: c.JsonRPC.Http.Cors, diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index e52077da97..b3b33e47ef 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -548,6 +548,13 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.P2P.Discovery.V5Enabled, Group: "P2P", }) + f.Uint64Flag(&flagset.Uint64Flag{ + Name: "txarrivalwait", + Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 100ms)", + Value: &c.cliConfig.P2P.TxArrivalWait, + Default: c.cliConfig.P2P.TxArrivalWait, + Group: "P2P", + }) // metrics f.BoolFlag(&flagset.BoolFlag{ diff --git a/node/defaults.go b/node/defaults.go index fd0277e29d..412278bc03 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -60,9 +60,10 @@ var DefaultConfig = Config{ WSModules: []string{"net", "web3"}, GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ - ListenAddr: ":30303", - MaxPeers: 50, - NAT: nat.Any(), + ListenAddr: ":30303", + MaxPeers: 50, + NAT: nat.Any(), + TxArrivalWait: 100, }, } diff --git a/p2p/server.go b/p2p/server.go index 138975e54b..c51ba3f5b7 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -156,6 +156,10 @@ type Config struct { Logger log.Logger `toml:",omitempty"` clock mclock.Clock + + // TxArrivalWait is the duration (ms) that the node will wait after seeing + // an announced transaction before explicitly requesting it + TxArrivalWait time.Duration } // Server manages all peer connections. diff --git a/scripts/getconfig.go b/scripts/getconfig.go index 09026a2479..caae916222 100644 --- a/scripts/getconfig.go +++ b/scripts/getconfig.go @@ -172,6 +172,7 @@ var nameTagMap = map[string]string{ "bootnodes": "bootnodes", "maxpeers": "maxpeers", "maxpendpeers": "maxpendpeers", + "txarrivalwait": "txarrivalwait", "nat": "nat", "nodiscover": "nodiscover", "v5disc": "v5disc", From 5496007cbafb31310cbf622c0b39af80b5e50eeb Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 7 Feb 2023 20:43:03 -0800 Subject: [PATCH 02/12] updated type conversion point to simplify - tested flag and no-flag (default) settings --- cmd/utils/flags.go | 4 ++-- eth/backend.go | 2 +- internal/cli/server/config.go | 2 +- p2p/server.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 318a98e017..3c3220ddc4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -758,7 +758,7 @@ var ( TxArrivalWaitFlag = cli.IntFlag{ Name: "txarrivalwait", Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 100ms)", - Value: (int)(node.DefaultConfig.P2P.TxArrivalWait), + Value: node.DefaultConfig.P2P.TxArrivalWait, } // Metrics flags @@ -1296,7 +1296,7 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) { } if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) { - cfg.TxArrivalWait = (time.Duration)(TxArrivalWaitFlag.Value) * time.Millisecond + cfg.TxArrivalWait = TxArrivalWaitFlag.Value } } diff --git a/eth/backend.go b/eth/backend.go index ad00cfacd2..c98b31966d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -266,7 +266,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EthAPI: ethAPI, PeerRequiredBlocks: config.PeerRequiredBlocks, checker: checker, - txArrivalWait: eth.p2pServer.TxArrivalWait, + txArrivalWait: time.Duration(eth.p2pServer.TxArrivalWait) * time.Millisecond, }); err != nil { return nil, err } diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index f75ca56a21..79c4acf7ec 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -1051,7 +1051,7 @@ func (c *Config) buildNode() (*node.Config, error) { MaxPendingPeers: int(c.P2P.MaxPendPeers), ListenAddr: c.P2P.Bind + ":" + strconv.Itoa(int(c.P2P.Port)), DiscoveryV5: c.P2P.Discovery.V5Enabled, - TxArrivalWait: time.Duration(c.P2P.TxArrivalWait), + TxArrivalWait: int(c.P2P.TxArrivalWait), }, HTTPModules: c.JsonRPC.Http.API, HTTPCors: c.JsonRPC.Http.Cors, diff --git a/p2p/server.go b/p2p/server.go index c51ba3f5b7..0f9285b01a 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -159,7 +159,7 @@ type Config struct { // TxArrivalWait is the duration (ms) that the node will wait after seeing // an announced transaction before explicitly requesting it - TxArrivalWait time.Duration + TxArrivalWait int } // Server manages all peer connections. From b5ff754b70c15fa0955173b3d6ea660bce3b1fa2 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Feb 2023 00:18:39 -0800 Subject: [PATCH 03/12] updated default txArrivalWait value to 500ms. updated commented lines to indicate the new default value. updated tx_fetcher_test to test using the new default value --- cmd/utils/flags.go | 2 +- docs/cli/server.md | 2 +- eth/fetcher/tx_fetcher_test.go | 2 +- internal/cli/server/config.go | 2 +- internal/cli/server/flags.go | 2 +- node/defaults.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 3c3220ddc4..82a79eeb61 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -757,7 +757,7 @@ var ( // fetcher flag to set arrival timeout TxArrivalWaitFlag = cli.IntFlag{ Name: "txarrivalwait", - Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 100ms)", + Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 500ms)", Value: node.DefaultConfig.P2P.TxArrivalWait, } diff --git a/docs/cli/server.md b/docs/cli/server.md index 7ec7251bfa..69c21232a8 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -146,7 +146,7 @@ The ```bor server``` command runs the Bor client. - ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false) -- ```txarrivalwait```: Maximum number of milliseconds to wait before requesting an announced transaction (default: 100) +- ```txarrivalwait```: Maximum number of milliseconds to wait before requesting an announced transaction (default: 500) ### Sealer Options diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 37fcc800ef..b3c3ee3bb7 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -39,7 +39,7 @@ var ( } // testTxsHashes is the hashes of the test transactions above testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()} - testTxArrivalWait = 100 * time.Millisecond + testTxArrivalWait = 500 * time.Millisecond ) type doTxNotify struct { diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 79c4acf7ec..3a7102a686 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -458,7 +458,7 @@ func DefaultConfig() *Config { Port: 30303, NoDiscover: false, NAT: "any", - TxArrivalWait: 100, + TxArrivalWait: 500, Discovery: &P2PDiscovery{ V5Enabled: false, Bootnodes: []string{}, diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index b3b33e47ef..51ccbe0142 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -550,7 +550,7 @@ func (c *Command) Flags() *flagset.Flagset { }) f.Uint64Flag(&flagset.Uint64Flag{ Name: "txarrivalwait", - Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 100ms)", + Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 500ms)", Value: &c.cliConfig.P2P.TxArrivalWait, Default: c.cliConfig.P2P.TxArrivalWait, Group: "P2P", diff --git a/node/defaults.go b/node/defaults.go index 412278bc03..e7c148b09c 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -63,7 +63,7 @@ var DefaultConfig = Config{ ListenAddr: ":30303", MaxPeers: 50, NAT: nat.Any(), - TxArrivalWait: 100, + TxArrivalWait: 500, }, } From 3c23de2486a3eaee9f938aef883ee26c551c1dee Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Feb 2023 01:52:19 -0800 Subject: [PATCH 04/12] changed txArrivalWait config type from int to time.Duration, changed flags to duration type. Tested on live both w/o flag set (default) and w/ flag set --- cmd/utils/flags.go | 7 ++++--- docs/cli/server.md | 2 +- eth/backend.go | 2 +- eth/handler.go | 2 +- internal/cli/server/config.go | 9 +++++---- internal/cli/server/flags.go | 4 ++-- node/defaults.go | 3 ++- p2p/server.go | 2 +- 8 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 82a79eeb61..3e139ff7d6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -754,10 +754,11 @@ var ( Usage: "Gas price below which gpo will ignore transactions", Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(), } - // fetcher flag to set arrival timeout - TxArrivalWaitFlag = cli.IntFlag{ + // flag to set the transaction fetcher's txArrivalWait value, which is the maximum waiting + // period the fetcher will wait to receive an announced tx before explicitly requesting it + TxArrivalWaitFlag = cli.DurationFlag{ Name: "txarrivalwait", - Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 500ms)", + Usage: "Maximum duration to wait for a transaction before requesting it (defaults to 500ms)", Value: node.DefaultConfig.P2P.TxArrivalWait, } diff --git a/docs/cli/server.md b/docs/cli/server.md index 69c21232a8..9775db4d6e 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -146,7 +146,7 @@ The ```bor server``` command runs the Bor client. - ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false) -- ```txarrivalwait```: Maximum number of milliseconds to wait before requesting an announced transaction (default: 500) +- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500) ### Sealer Options diff --git a/eth/backend.go b/eth/backend.go index c98b31966d..ad00cfacd2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -266,7 +266,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EthAPI: ethAPI, PeerRequiredBlocks: config.PeerRequiredBlocks, checker: checker, - txArrivalWait: time.Duration(eth.p2pServer.TxArrivalWait) * time.Millisecond, + txArrivalWait: eth.p2pServer.TxArrivalWait, }); err != nil { return nil, err } diff --git a/eth/handler.go b/eth/handler.go index 24f41e017a..b58fab1773 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -93,7 +93,7 @@ type handlerConfig struct { PeerRequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges checker ethereum.ChainValidator - txArrivalWait time.Duration // Max time in milliseconds to wait for an announced tx before requesting it + txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it } type handler struct { diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 3a7102a686..15a8eac8ce 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -132,8 +132,9 @@ type P2PConfig struct { // Discovery has the p2p discovery related settings Discovery *P2PDiscovery `hcl:"discovery,block" toml:"discovery,block"` - // TxArrivalWait sets the maximum wait for announced transactions - TxArrivalWait uint64 `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` + // TxArrivalWait sets the maximum duration the transaction fetcher will wait for + // an announced transaction to arrive before explicitly requesting it + TxArrivalWait time.Duration `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` } type P2PDiscovery struct { @@ -458,7 +459,7 @@ func DefaultConfig() *Config { Port: 30303, NoDiscover: false, NAT: "any", - TxArrivalWait: 500, + TxArrivalWait: 500 * time.Millisecond, Discovery: &P2PDiscovery{ V5Enabled: false, Bootnodes: []string{}, @@ -1051,7 +1052,7 @@ func (c *Config) buildNode() (*node.Config, error) { MaxPendingPeers: int(c.P2P.MaxPendPeers), ListenAddr: c.P2P.Bind + ":" + strconv.Itoa(int(c.P2P.Port)), DiscoveryV5: c.P2P.Discovery.V5Enabled, - TxArrivalWait: int(c.P2P.TxArrivalWait), + TxArrivalWait: c.P2P.TxArrivalWait, }, HTTPModules: c.JsonRPC.Http.API, HTTPCors: c.JsonRPC.Http.Cors, diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 51ccbe0142..613c8a105b 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -548,9 +548,9 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.P2P.Discovery.V5Enabled, Group: "P2P", }) - f.Uint64Flag(&flagset.Uint64Flag{ + f.DurationFlag(&flagset.DurationFlag{ Name: "txarrivalwait", - Usage: "Maximum number of milliseconds to wait for a transaction before requesting it (defaults to 500ms)", + Usage: "Maximum duration to wait for a transaction before explicitly requesting it (defaults to 500ms)", Value: &c.cliConfig.P2P.TxArrivalWait, Default: c.cliConfig.P2P.TxArrivalWait, Group: "P2P", diff --git a/node/defaults.go b/node/defaults.go index e7c148b09c..a32fa868ef 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -21,6 +21,7 @@ import ( "os/user" "path/filepath" "runtime" + "time" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/nat" @@ -63,7 +64,7 @@ var DefaultConfig = Config{ ListenAddr: ":30303", MaxPeers: 50, NAT: nat.Any(), - TxArrivalWait: 500, + TxArrivalWait: 500 * time.Millisecond, }, } diff --git a/p2p/server.go b/p2p/server.go index 0f9285b01a..c51ba3f5b7 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -159,7 +159,7 @@ type Config struct { // TxArrivalWait is the duration (ms) that the node will wait after seeing // an announced transaction before explicitly requesting it - TxArrivalWait int + TxArrivalWait time.Duration } // Server manages all peer connections. From cfb9eb3d1991d5ded0fb4b26b5c3ee096947161f Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Feb 2023 02:01:25 -0800 Subject: [PATCH 05/12] added missing ms to server.md (typo fix) --- docs/cli/server.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cli/server.md b/docs/cli/server.md index 9775db4d6e..ff89ff9e5c 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -146,7 +146,7 @@ The ```bor server``` command runs the Bor client. - ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false) -- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500) +- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500ms) ### Sealer Options From b8be0dace866c6ca1995eb2f344c2beaf0790d5c Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Feb 2023 02:20:15 -0800 Subject: [PATCH 06/12] added log line to print the txArrivalWait value into the fetchers go routine process --- eth/fetcher/tx_fetcher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 7b55439011..7d85e50bd2 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -349,6 +349,9 @@ func (f *TxFetcher) loop() { waitTrigger = make(chan struct{}, 1) timeoutTrigger = make(chan struct{}, 1) ) + + log.Info("TxFetcher", "txArrivalWait", f.txArrivalWait.String()) + for { select { case ann := <-f.notify: From d56370b9101717d7b48c5862416a8069df949174 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Feb 2023 03:14:38 -0800 Subject: [PATCH 07/12] added check to set the txArrivalWait value to the txGatherSlack value if txArrivalWait < txGatherSlack --- eth/fetcher/tx_fetcher.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 7d85e50bd2..ad2ff13979 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -332,6 +332,11 @@ func (f *TxFetcher) Drop(peer string) error { // Start boots up the announcement based synchroniser, accepting and processing // hash notifications and block fetches until termination requested. func (f *TxFetcher) Start() { + // the txArrivalWait duration should not be less than the txGatherSlack duration + if f.txArrivalWait < txGatherSlack { + f.txArrivalWait = txGatherSlack + } + go f.loop() } From 070b13efbd5322c4a3a4a914144ac44ba87b4a55 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 14 Feb 2023 22:03:42 -0800 Subject: [PATCH 08/12] added config.toml example values to different config setups, added TxArrivalWaitRaw string for toml and hcl value inputs --- docs/cli/example_config.toml | 13 +++++++------ internal/cli/dumpconfig.go | 1 + internal/cli/server/config.go | 4 +++- packaging/templates/mainnet-v1/archive/config.toml | 1 + .../mainnet-v1/sentry/sentry/bor/config.toml | 1 + .../mainnet-v1/sentry/validator/bor/config.toml | 1 + .../mainnet-v1/without-sentry/bor/config.toml | 1 + packaging/templates/testnet-v4/archive/config.toml | 1 + .../testnet-v4/sentry/sentry/bor/config.toml | 1 + .../testnet-v4/sentry/validator/bor/config.toml | 1 + .../testnet-v4/without-sentry/bor/config.toml | 1 + 11 files changed, 19 insertions(+), 7 deletions(-) diff --git a/docs/cli/example_config.toml b/docs/cli/example_config.toml index 64ef60ae12..052353b18b 100644 --- a/docs/cli/example_config.toml +++ b/docs/cli/example_config.toml @@ -19,12 +19,13 @@ ethstats = "" # Reporting URL of a ethstats service (nodename:sec "32000000" = "0x875500011e5eecc0c554f95d07b31cf59df4ca2505f4dbbfffa7d4e4da917c68" [p2p] - maxpeers = 50 # Maximum number of network peers (network disabled if set to 0) - maxpendpeers = 50 # Maximum number of pending connection attempts - bind = "0.0.0.0" # Network binding address - port = 30303 # Network listening port - nodiscover = false # Disables the peer discovery mechanism (manual peer addition) - nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:) + maxpeers = 50 # Maximum number of network peers (network disabled if set to 0) + maxpendpeers = 50 # Maximum number of pending connection attempts + bind = "0.0.0.0" # Network binding address + port = 30303 # Network listening port + nodiscover = false # Disables the peer discovery mechanism (manual peer addition) + nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:) + txarrivalwait = "500ms" # Maximum duration to wait before requesting an announced transaction [p2p.discovery] v5disc = false # Enables the experimental RLPx V5 (Topic Discovery) mechanism bootnodes = [] # Comma separated enode URLs for P2P discovery bootstrap diff --git a/internal/cli/dumpconfig.go b/internal/cli/dumpconfig.go index a748af3357..c585afeb39 100644 --- a/internal/cli/dumpconfig.go +++ b/internal/cli/dumpconfig.go @@ -62,6 +62,7 @@ func (c *DumpconfigCommand) Run(args []string) int { userConfig.Gpo.IgnorePriceRaw = userConfig.Gpo.IgnorePrice.String() userConfig.Cache.RejournalRaw = userConfig.Cache.Rejournal.String() userConfig.Cache.TrieTimeoutRaw = userConfig.Cache.TrieTimeout.String() + userConfig.P2P.TxArrivalWaitRaw = userConfig.P2P.TxArrivalWait.String() if err := toml.NewEncoder(os.Stdout).Encode(userConfig); err != nil { c.UI.Error(err.Error()) diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 15a8eac8ce..453c5c2c24 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -134,7 +134,8 @@ type P2PConfig struct { // TxArrivalWait sets the maximum duration the transaction fetcher will wait for // an announced transaction to arrive before explicitly requesting it - TxArrivalWait time.Duration `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` + TxArrivalWait time.Duration `hcl:"-,optional" toml:"-"` + TxArrivalWaitRaw string `hcl:"txarrivalwait,optional" toml:"txarrivalwait,optional"` } type P2PDiscovery struct { @@ -636,6 +637,7 @@ func (c *Config) fillTimeDurations() error { {"txpool.rejournal", &c.TxPool.Rejournal, &c.TxPool.RejournalRaw}, {"cache.rejournal", &c.Cache.Rejournal, &c.Cache.RejournalRaw}, {"cache.timeout", &c.Cache.TrieTimeout, &c.Cache.TrieTimeoutRaw}, + {"p2p.txarrivalwait", &c.P2P.TxArrivalWait, &c.P2P.TxArrivalWaitRaw}, } for _, x := range tds { diff --git a/packaging/templates/mainnet-v1/archive/config.toml b/packaging/templates/mainnet-v1/archive/config.toml index 9eaafd3bee..cf5aeec17f 100644 --- a/packaging/templates/mainnet-v1/archive/config.toml +++ b/packaging/templates/mainnet-v1/archive/config.toml @@ -18,6 +18,7 @@ gcmode = "archive" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml index 94dd6634f0..4847761617 100644 --- a/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/sentry/bor/config.toml @@ -18,6 +18,7 @@ syncmode = "full" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml index 9c55683c96..cf03ec5a98 100644 --- a/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml +++ b/packaging/templates/mainnet-v1/sentry/validator/bor/config.toml @@ -20,6 +20,7 @@ syncmode = "full" # maxpendpeers = 50 # bind = "0.0.0.0" # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml index 573f1f3be8..a474b90bb0 100644 --- a/packaging/templates/mainnet-v1/without-sentry/bor/config.toml +++ b/packaging/templates/mainnet-v1/without-sentry/bor/config.toml @@ -20,6 +20,7 @@ syncmode = "full" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/archive/config.toml b/packaging/templates/testnet-v4/archive/config.toml index 1762fdf117..5cef74137d 100644 --- a/packaging/templates/testnet-v4/archive/config.toml +++ b/packaging/templates/testnet-v4/archive/config.toml @@ -18,6 +18,7 @@ gcmode = "archive" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml index ae191cec2c..89481eb275 100644 --- a/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/sentry/bor/config.toml @@ -18,6 +18,7 @@ syncmode = "full" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml index b441cc137d..e99d24a5ac 100644 --- a/packaging/templates/testnet-v4/sentry/validator/bor/config.toml +++ b/packaging/templates/testnet-v4/sentry/validator/bor/config.toml @@ -20,6 +20,7 @@ syncmode = "full" # maxpendpeers = 50 # bind = "0.0.0.0" # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] diff --git a/packaging/templates/testnet-v4/without-sentry/bor/config.toml b/packaging/templates/testnet-v4/without-sentry/bor/config.toml index 05a254e184..db72000c18 100644 --- a/packaging/templates/testnet-v4/without-sentry/bor/config.toml +++ b/packaging/templates/testnet-v4/without-sentry/bor/config.toml @@ -20,6 +20,7 @@ syncmode = "full" # bind = "0.0.0.0" # nodiscover = false # nat = "any" + # txarrivalwait = "500ms" # [p2p.discovery] # v5disc = false # bootnodes = [] From a5b49c313fdd1bf1875ce7a18cbf2fe9f56f0d9e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 4 Apr 2023 23:20:07 -0700 Subject: [PATCH 09/12] added a maxTxArrivalWait constant to the fetcher. config params with longer durations than maxTxArrivalWait will default to the maxTxArrivalWait duration --- eth/fetcher/tx_fetcher.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index ad2ff13979..da6017b532 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -56,6 +56,10 @@ const ( // txGatherSlack is the interval used to collate almost-expired announces // with network fetches. txGatherSlack = 20 * time.Millisecond + + // maxTxArrivalWait is the longest acceptable duration for the txArrivalWait + // configuration value. Longer config values will default to this. + maxTxArrivalWait = 500 * time.Millisecond ) var ( @@ -337,6 +341,11 @@ func (f *TxFetcher) Start() { f.txArrivalWait = txGatherSlack } + // the txArrivalWait duration should not be greater than the maxTxArrivalWait duration + if f.txArrivalWait > maxTxArrivalWait { + f.txArrivalWait = maxTxArrivalWait + } + go f.loop() } From 7d765e03b04fe7c5997e526dc78593a01c6d665c Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 28 Apr 2023 16:25:08 -0700 Subject: [PATCH 10/12] added in some cuddling changes to address the linter / CI test fails --- core/tx_list.go | 2 ++ eth/fetcher/tx_fetcher.go | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index fea4434b9b..1540e042da 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -512,7 +512,9 @@ func (l *txList) Filter(costLimit *uint256.Int, gasLimit uint64) (types.Transact } invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } + l.txs.reheap() + return removed, invalids } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index c147b7c50a..81f28d66d3 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -719,9 +719,10 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { } } } - *timer = f.clock.AfterFunc(f.txArrivalWait-time.Duration(now-earliest), func() { - trigger <- struct{}{} - }) + *timer = f.clock.AfterFunc( + f.txArrivalWait-time.Duration(now-earliest), + func() { trigger <- struct{}{} }, + ) } // rescheduleTimeout iterates over all the transactions currently in flight and From 63f79e35598165955123251350820f0cd2e1f6cf Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 28 Apr 2023 16:50:08 -0700 Subject: [PATCH 11/12] added a space for some cuddlin --- eth/fetcher/tx_fetcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 81f28d66d3..8bdb1547e8 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -719,6 +719,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { } } } + *timer = f.clock.AfterFunc( f.txArrivalWait-time.Duration(now-earliest), func() { trigger <- struct{}{} }, From 62d8cc02d32e8250308246e501774016b76784cb Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 1 May 2023 10:59:43 -0700 Subject: [PATCH 12/12] added in 500ms duration to txfetcher_fuzzer --- tests/fuzzers/txfetcher/txfetcher_fuzzer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index d1d6fdc665..fc15e07c7e 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -83,7 +83,7 @@ func Fuzz(input []byte) int { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - clock, rand, + clock, rand, 500*time.Millisecond, ) f.Start() defer f.Stop()