Skip to content

Commit

Permalink
all: EIP-1559 tx pool support (ethereum#22898)
Browse files Browse the repository at this point in the history
This pull request implements EIP-1559 compatible transaction pool with dual heap eviction ordering.
It is based on ethereum#22791
The eviction ordering scheme and the reasoning behind it is described here: https://gist.github.com/zsfelfoldi/9607ad248707a925b701f49787904fd6
  • Loading branch information
zsfelfoldi authored May 28, 2021
1 parent ee35ddc commit 966ee3a
Show file tree
Hide file tree
Showing 5 changed files with 743 additions and 143 deletions.
202 changes: 132 additions & 70 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"math/big"
"sort"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -279,15 +280,23 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
if old != nil {
// threshold = oldGP * (100 + priceBump) / 100
if old.FeeCapCmp(tx) >= 0 || old.TipCmp(tx) >= 0 {
return false, nil
}
// thresholdFeeCap = oldFC * (100 + priceBump) / 100
a := big.NewInt(100 + int64(priceBump))
a = a.Mul(a, old.GasPrice())
aFeeCap := new(big.Int).Mul(a, old.FeeCap())
aTip := a.Mul(a, old.Tip())

// thresholdTip = oldTip * (100 + priceBump) / 100
b := big.NewInt(100)
threshold := a.Div(a, b)
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
thresholdFeeCap := aFeeCap.Div(aFeeCap, b)
thresholdTip := aTip.Div(aTip, b)

// Have to ensure that either the new fee cap or tip is higher than the
// old ones as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPriceCmp(tx) >= 0 || tx.GasPriceIntCmp(threshold) < 0 {
if tx.FeeCapIntCmp(thresholdFeeCap) < 0 || tx.TipIntCmp(thresholdTip) < 0 {
return false, nil
}
}
Expand Down Expand Up @@ -406,52 +415,84 @@ func (l *txList) LastElement() *types.Transaction {
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up.
type priceHeap []*types.Transaction
// price-sorted transactions to discard when the pool fills up. If baseFee is set
// then the heap is sorted based on the effective tip based on the given base fee.
// If baseFee is nil then the sorting is based on feeCap.
type priceHeap struct {
baseFee *big.Int // heap should always be re-sorted after baseFee is changed
list []*types.Transaction
}

func (h priceHeap) Len() int { return len(h) }
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *priceHeap) Len() int { return len(h.list) }
func (h *priceHeap) Swap(i, j int) { h.list[i], h.list[j] = h.list[j], h.list[i] }

func (h priceHeap) Less(i, j int) bool {
// Sort primarily by price, returning the cheaper one
switch h[i].GasPriceCmp(h[j]) {
func (h *priceHeap) Less(i, j int) bool {
switch h.cmp(h.list[i], h.list[j]) {
case -1:
return true
case 1:
return false
default:
return h.list[i].Nonce() > h.list[j].Nonce()
}
// If the prices match, stabilize via nonces (high nonce is worse)
return h[i].Nonce() > h[j].Nonce()
}

func (h *priceHeap) cmp(a, b *types.Transaction) int {
if h.baseFee != nil {
// Compare effective tips if baseFee is specified
if c := a.EffectiveTipCmp(b, h.baseFee); c != 0 {
return c
}
}
// Compare fee caps if baseFee is not specified or effective tips are equal
if c := a.FeeCapCmp(b); c != 0 {
return c
}
// Compare tips if effective tips and fee caps are equal
return a.TipCmp(b)
}

func (h *priceHeap) Push(x interface{}) {
*h = append(*h, x.(*types.Transaction))
tx := x.(*types.Transaction)
h.list = append(h.list, tx)
}

func (h *priceHeap) Pop() interface{} {
old := *h
old := h.list
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
h.list = old[0 : n-1]
return x
}

// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way. It's built opon the all transactions
// in txpool but only interested in the remote part. It means only remote transactions
// will be considered for tracking, sorting, eviction, etc.
//
// Two heaps are used for sorting: the urgent heap (based on effective tip in the next
// block) and the floating heap (based on feeCap). Always the bigger heap is chosen for
// eviction. Transactions evicted from the urgent heap are first demoted into the floating heap.
// In some cases (during a congestion, when blocks are full) the urgent heap can provide
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger)
}

const (
// urgentRatio : floatingRatio is the capacity ratio of the two queues
urgentRatio = 4
floatingRatio = 1
)

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
remotes: new(priceHeap),
all: all,
}
}

Expand All @@ -460,7 +501,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
if local {
return
}
heap.Push(l.remotes, tx)
// Insert every new transaction to the urgent heap first; Discard will balance the heaps
heap.Push(&l.urgent, tx)
}

// Removed notifies the prices transaction list that an old transaction dropped
Expand All @@ -469,58 +511,43 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales += count
if l.stales <= len(*l.remotes)/4 {
if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
l.Reheap()
}

// Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returns them for further removal from the entire pool.
//
// Note: only remote transactions will be considered for eviction.
func (l *txPricedList) Cap(threshold *big.Int) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
for len(*l.remotes) > 0 {
// Discard stale transactions if found during cleanup
cheapest := (*l.remotes)[0]
if l.all.GetRemote(cheapest.Hash()) == nil { // Removed or migrated
heap.Pop(l.remotes)
l.stales--
continue
}
// Stop the discards if we've reached the threshold
if cheapest.GasPriceIntCmp(threshold) >= 0 {
break
}
heap.Pop(l.remotes)
drop = append(drop, cheapest)
}
return drop
}

// Underpriced checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced (remote) transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Note: with two queues, being underpriced is defined as being worse than the worst item
// in all non-empty queues if there is any. If both queues are empty then nothing is underpriced.
return (l.underpricedFor(&l.urgent, tx) || len(l.urgent.list) == 0) &&
(l.underpricedFor(&l.floating, tx) || len(l.floating.list) == 0) &&
(len(l.urgent.list) != 0 || len(l.floating.list) != 0)
}

// underpricedFor checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced (remote) transaction in the given heap.
func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard stale price points if found at the heap start
for len(*l.remotes) > 0 {
head := []*types.Transaction(*l.remotes)[0]
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
l.stales--
heap.Pop(l.remotes)
heap.Pop(h)
continue
}
break
}
// Check if the transaction is underpriced or not
if len(*l.remotes) == 0 {
if len(h.list) == 0 {
return false // There is no remote transaction at all.
}
// If the remote transaction is even cheaper than the
// cheapest one tracked locally, reject it.
cheapest := []*types.Transaction(*l.remotes)[0]
return cheapest.GasPriceCmp(tx) >= 0
return h.cmp(h.list[0], tx) >= 0
}

// Discard finds a number of most underpriced transactions, removes them from the
Expand All @@ -529,21 +556,36 @@ func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for len(*l.remotes) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.remotes).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
continue
for slots > 0 {
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio || floatingRatio == 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
continue
}
// Non stale transaction found, move to floating heap
heap.Push(&l.floating, tx)
} else {
if len(l.floating.list) == 0 {
// Stop if both heaps are empty
break
}
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
continue
}
// Non stale transaction found, discard it
drop = append(drop, tx)
slots -= numSlots(tx)
}
// Non stale transaction found, discard it
drop = append(drop, tx)
slots -= numSlots(tx)
}
// If we still can't make enough room for the new transaction
if slots > 0 && !force {
for _, tx := range drop {
heap.Push(l.remotes, tx)
heap.Push(&l.urgent, tx)
}
return nil, false
}
Expand All @@ -552,12 +594,32 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *txPricedList) Reheap() {
reheap := make(priceHeap, 0, l.all.RemoteCount())

l.stales, l.remotes = 0, &reheap
start := time.Now()
l.stales = 0
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
*l.remotes = append(*l.remotes, tx)
l.urgent.list = append(l.urgent.list, tx)
return true
}, false, true) // Only iterate remotes
heap.Init(l.remotes)
heap.Init(&l.urgent)

// balance out the two heaps by moving the worse half of transactions into the
// floating heap
// Note: Discard would also do this before the first eviction but Reheap can do
// is more efficiently. Also, Underpriced would work suboptimally the first time
// if the floating queue was empty.
floatingCount := len(l.urgent.list) * floatingRatio / (urgentRatio + floatingRatio)
l.floating.list = make([]*types.Transaction, floatingCount)
for i := 0; i < floatingCount; i++ {
l.floating.list[i] = heap.Pop(&l.urgent).(*types.Transaction)
}
heap.Init(&l.floating)
reheapTimer.Update(time.Since(start))
}

// SetBaseFee updates the base fee and triggers a re-heap. Note that Removed is not
// necessary to call right before SetBaseFee when processing a new block.
func (l *txPricedList) SetBaseFee(baseFee *big.Int) {
l.urgent.baseFee = baseFee
l.Reheap()
}
Loading

0 comments on commit 966ee3a

Please sign in to comment.