From 7fd1790409ba98bab8c4f9f07ed99f14dbdcf685 Mon Sep 17 00:00:00 2001 From: redhdx <136775144+redhdx@users.noreply.github.com> Date: Mon, 5 Aug 2024 22:07:35 +0800 Subject: [PATCH] feature(op-geth): add opbnb gasless solution (#130) --- cmd/geth/main.go | 4 + cmd/utils/flags.go | 41 ++ consensus/misc/eip1559/eip1559.go | 3 + core/state_transition.go | 16 +- core/txpool/bundlepool/bundlepool.go | 455 +++++++++++++++++++++++ core/txpool/bundlepool/config.go | 25 ++ core/txpool/subpool.go | 18 + core/txpool/txpool.go | 52 ++- core/types/bundle.go | 71 ++++ core/types/bundle_gasless.go | 20 + core/types/receipt.go | 3 + eth/api_backend.go | 30 ++ eth/backend.go | 9 + eth/ethconfig/config.go | 6 +- eth/ethconfig/gen_config.go | 19 + ethclient/ethclient.go | 30 ++ internal/ethapi/api_bundle.go | 150 ++++++++ internal/ethapi/api_test.go | 9 + internal/ethapi/backend.go | 6 + internal/ethapi/transaction_args_test.go | 7 + internal/flags/categories.go | 2 + miner/bundle_cache.go | 88 +++++ miner/miner.go | 117 +++++- miner/worker.go | 79 +++- miner/worker_builder.go | 450 ++++++++++++++++++++++ params/config.go | 16 +- params/protocol_params.go | 2 + 27 files changed, 1700 insertions(+), 28 deletions(-) create mode 100644 core/txpool/bundlepool/bundlepool.go create mode 100644 core/txpool/bundlepool/config.go create mode 100644 core/types/bundle.go create mode 100644 core/types/bundle_gasless.go create mode 100644 internal/ethapi/api_bundle.go create mode 100644 miner/bundle_cache.go create mode 100644 miner/worker_builder.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3f19f7b260..23761964c4 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -85,6 +85,7 @@ var ( utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, utils.TxPoolReannounceRemotesFlag, + utils.BundlePoolGlobalSlotsFlag, utils.BlobPoolDataDirFlag, utils.BlobPoolDataCapFlag, utils.BlobPoolPriceBumpFlag, @@ -135,6 +136,9 @@ var ( utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNewPayloadTimeout, + utils.MevEnabledFlag, + utils.MevBundleReceiverUrlFlag, + utils.MevBundleGasPriceFloorFlag, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV4Flag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 70621a57d9..8d1a1fe1d6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -23,6 +23,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math" "math/big" "net" @@ -440,6 +441,13 @@ var ( Value: ethconfig.Defaults.TxPool.ReannounceRemotes, Category: flags.TxPoolCategory, } + // bundle pool settings + BundlePoolGlobalSlotsFlag = &cli.Uint64Flag{ + Name: "bundlepool.globalslots", + Usage: "Maximum number of executable bundle slots for all accounts", + Value: ethconfig.Defaults.BundlePool.GlobalSlots, + Category: flags.BundlePoolCategory, + } // Blob transaction pool settings BlobPoolDataDirFlag = &cli.StringFlag{ Name: "blobpool.datadir", @@ -558,6 +566,22 @@ var ( Value: ethconfig.Defaults.Miner.NewPayloadTimeout, Category: flags.MinerCategory, } + MevEnabledFlag = &cli.BoolFlag{ + Name: "mev.enable", + Usage: "Enable mev", + Category: flags.MEVCategory, + } + MevBundleReceiverUrlFlag = &cli.StringFlag{ + Name: "mev.bundle.receiver.url", + Usage: "Url of bundle receiver endpoint to use. Multiple urls are supported, separated by commas", + Category: flags.MEVCategory, + } + MevBundleGasPriceFloorFlag = &cli.Int64Flag{ + Name: "mev.bundle.gasprice.floor", + Usage: "Minimum bundle gas price for mev", + Value: ethconfig.Defaults.Miner.Mev.MevBundleGasPriceFloor, + Category: flags.MEVCategory, + } // Account settings UnlockedAccountFlag = &cli.StringFlag{ @@ -1699,6 +1723,12 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { } } +func setBundlePool(ctx *cli.Context, cfg *bundlepool.Config) { + if ctx.IsSet(BundlePoolGlobalSlotsFlag.Name) { + cfg.GlobalSlots = ctx.Uint64(BundlePoolGlobalSlotsFlag.Name) + } +} + func setMiner(ctx *cli.Context, cfg *miner.Config) { if ctx.IsSet(MinerExtraDataFlag.Name) { cfg.ExtraData = []byte(ctx.String(MinerExtraDataFlag.Name)) @@ -1718,6 +1748,16 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { if ctx.IsSet(RollupComputePendingBlock.Name) { cfg.RollupComputePendingBlock = ctx.Bool(RollupComputePendingBlock.Name) } + if ctx.IsSet(MevEnabledFlag.Name) { + cfg.Mev.MevEnabled = ctx.Bool(MevEnabledFlag.Name) + } + if ctx.IsSet(MevBundleReceiverUrlFlag.Name) { + url := ctx.String(MevBundleReceiverUrlFlag.Name) + cfg.Mev.MevReceivers = strings.Split(url, ",") + } + if ctx.IsSet(MevBundleGasPriceFloorFlag.Name) { + cfg.Mev.MevBundleGasPriceFloor = ctx.Int64(MevBundleGasPriceFloorFlag.Name) + } } func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) { @@ -1799,6 +1839,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { setEtherbase(ctx, cfg) setGPO(ctx, &cfg.GPO) setTxPool(ctx, &cfg.TxPool) + setBundlePool(ctx, &cfg.BundlePool) setMiner(ctx, &cfg.Miner) setRequiredBlocks(ctx, cfg) setLes(ctx, cfg) diff --git a/consensus/misc/eip1559/eip1559.go b/consensus/misc/eip1559/eip1559.go index a66298af69..e1c85bd091 100644 --- a/consensus/misc/eip1559/eip1559.go +++ b/consensus/misc/eip1559/eip1559.go @@ -58,6 +58,9 @@ func VerifyEIP1559Header(config *params.ChainConfig, parent, header *types.Heade // CalcBaseFee calculates the basefee of the header. // The time belongs to the new block to check if Canyon is activted or not func CalcBaseFee(config *params.ChainConfig, parent *types.Header, time uint64) *big.Int { + if config.IsWright(time) { + return new(big.Int).SetUint64(params.OpBNBBaseFeeForGasLess) + } // If the current block is the first EIP-1559 block, return the InitialBaseFee. if !config.IsLondon(parent.Number) { return new(big.Int).SetUint64(params.InitialBaseFee) diff --git a/core/state_transition.go b/core/state_transition.go index 932e7aa921..898ddc31a6 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -250,7 +250,11 @@ func (st *StateTransition) buyGas() error { mgval = mgval.Mul(mgval, st.msg.GasPrice) var l1Cost *big.Int if st.evm.Context.L1CostFunc != nil && !st.msg.SkipAccountChecks { - l1Cost = st.evm.Context.L1CostFunc(st.msg.RollupCostData, st.evm.Context.Time) + if st.msg.GasPrice.Cmp(big.NewInt(0)) == 0 && st.evm.ChainConfig().IsWright(st.evm.Context.Time) { + l1Cost = big.NewInt(0) + } else { + l1Cost = st.evm.Context.L1CostFunc(st.msg.RollupCostData, st.evm.Context.Time) + } if l1Cost != nil { mgval = mgval.Add(mgval, l1Cost) } @@ -546,8 +550,14 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { // Note optimismConfig will not be nil if rules.IsOptimismBedrock is true if optimismConfig := st.evm.ChainConfig().Optimism; optimismConfig != nil && rules.IsOptimismBedrock && !st.msg.IsDepositTx { st.state.AddBalance(params.OptimismBaseFeeRecipient, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.evm.Context.BaseFee)) - if cost := st.evm.Context.L1CostFunc(st.msg.RollupCostData, st.evm.Context.Time); cost != nil { - st.state.AddBalance(params.OptimismL1FeeRecipient, cost) + var l1Cost *big.Int + if st.msg.GasPrice.Cmp(big.NewInt(0)) == 0 && st.evm.ChainConfig().IsWright(st.evm.Context.Time) { + l1Cost = big.NewInt(0) + } else { + l1Cost = st.evm.Context.L1CostFunc(st.msg.RollupCostData, st.evm.Context.Time) + } + if l1Cost != nil { + st.state.AddBalance(params.OptimismL1FeeRecipient, l1Cost) } } diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go new file mode 100644 index 0000000000..1f5589badd --- /dev/null +++ b/core/txpool/bundlepool/bundlepool.go @@ -0,0 +1,455 @@ +package bundlepool + +import ( + "container/heap" + "context" + "errors" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/rpc" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" +) + +const ( + // TODO: decide on a good default value + // bundleSlotSize is used to calculate how many data slots a single bundle + // takes up based on its size. The slots are used as DoS protection, ensuring + // that validating a new bundle remains a constant operation (in reality + // O(maxslots), where max slots are 4 currently). + bundleSlotSize = 128 * 1024 // 128KB + + maxMinTimestampFromNow = int64(300) // 5 minutes + + dialAttempts = 3 + dialSleep = 3 * time.Second +) + +var ( + bundleGauge = metrics.NewRegisteredGauge("bundlepool/bundles", nil) + slotsGauge = metrics.NewRegisteredGauge("bundlepool/slots", nil) + bundleDeliverAll = metrics.NewRegisteredCounter("bundle/deliver/all", nil) + bundleDeliverFailed = metrics.NewRegisteredCounter("bundle/deliver/failed", nil) +) + +var ( + // ErrSimulatorMissing is returned if the bundle simulator is missing. + ErrSimulatorMissing = errors.New("bundle simulator is missing") + + // ErrBundleTimestampTooHigh is returned if the bundle's MinTimestamp is too high. + ErrBundleTimestampTooHigh = errors.New("bundle MinTimestamp is too high") + + // ErrBundleGasPriceLow is returned if the bundle gas price is too low. + ErrBundleGasPriceLow = errors.New("bundle gas price is too low") + + // ErrBundleAlreadyExist is returned if the bundle is already contained + // within the pool. + ErrBundleAlreadyExist = errors.New("bundle already exist") +) + +// BlockChain defines the minimal set of methods needed to back a tx pool with +// a chain. Exists to allow mocking the live chain out of tests. +type BlockChain interface { + // Config retrieves the chain's fork configuration. + Config() *params.ChainConfig + + // CurrentBlock returns the current head of the chain. + CurrentBlock() *types.Header + + // GetBlock retrieves a specific block, used during pool resets. + GetBlock(hash common.Hash, number uint64) *types.Block + + // StateAt returns a state database for a given root hash (generally the head). + StateAt(root common.Hash) (*state.StateDB, error) +} + +type BundleSimulator interface { + SimulateBundle(bundle *types.Bundle) (*big.Int, error) +} + +type BundlePool struct { + config Config + mevConfig miner.MevConfig + chain BlockChain + + bundles map[common.Hash]*types.Bundle + bundleHeap BundleHeap + mu sync.RWMutex + + slots uint64 // Number of slots currently allocated + + simulator BundleSimulator + + bundleReceiverClients map[string]*rpc.Client +} + +func New(config Config, mevConfig miner.MevConfig, chain BlockChain) *BundlePool { + // Sanitize the input to ensure no vulnerable gas prices are set + config = (&config).sanitize() + + pool := &BundlePool{ + config: config, + mevConfig: mevConfig, + chain: chain, + bundles: make(map[common.Hash]*types.Bundle), + bundleHeap: make(BundleHeap, 0), + bundleReceiverClients: make(map[string]*rpc.Client), + } + if pool.mevConfig.MevReceivers == nil { + return pool + } + for _, v := range mevConfig.MevReceivers { + pool.register(v) + } + if len(pool.bundleReceiverClients) == 0 { + log.Error("No valid bundleReceivers") + } + + return pool +} + +func (p *BundlePool) register(url string) { + var receiverClient *rpc.Client + if url != "" { + var err error + for i := 1; i < dialAttempts+1; i++ { + receiverClient, err = rpc.Dial(url) + if err != nil { + log.Error("failed to dial bundle receiver", "attempts", i, "url", url, "err", err) + if i == dialAttempts { + return + } + time.Sleep(dialSleep) + continue + } + p.bundleReceiverClients[url] = receiverClient + log.Info("success to dial bundle receiver", "url", url) + return + } + } +} + +func (p *BundlePool) SetBundleSimulator(simulator BundleSimulator) { + p.simulator = simulator +} + +func (p *BundlePool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error { + return nil +} + +func (p *BundlePool) FilterBundle(bundle *types.Bundle) bool { + for _, tx := range bundle.Txs { + if !p.filter(tx) { + return false + } + } + return true +} + +// AddBundle adds a mev bundle to the pool +func (p *BundlePool) AddBundle(bundle *types.Bundle, originBundle *types.SendBundleArgs) error { + if p.simulator == nil { + return ErrSimulatorMissing + } + + if bundle.MinTimestamp > uint64(time.Now().Unix()+maxMinTimestampFromNow) { + return ErrBundleTimestampTooHigh + } + + price, err := p.simulator.SimulateBundle(bundle) + if err != nil { + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + + if price.Cmp(p.minimalBundleGasPrice()) <= 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots { + return ErrBundleGasPriceLow + } + bundle.Price = price + + hash := bundle.Hash() + if _, ok := p.bundles[hash]; ok { + return ErrBundleAlreadyExist + } + + for url, cli := range p.bundleReceiverClients { + cli := cli + url := url + go func() { + var hash common.Hash + err := cli.CallContext(context.Background(), &hash, "eth_sendBundle", *originBundle) + if err != nil { + bundleDeliverFailed.Inc(1) + log.Error("failed to deliver bundle to receiver", "url", url, "err", err) + } + }() + bundleDeliverAll.Inc(1) + } + + for p.slots+numSlots(bundle) > p.config.GlobalSlots { + p.drop() + } + p.bundles[hash] = bundle + heap.Push(&p.bundleHeap, bundle) + p.slots += numSlots(bundle) + + bundleGauge.Update(int64(len(p.bundles))) + slotsGauge.Update(int64(p.slots)) + return nil +} + +func (p *BundlePool) GetBundle(hash common.Hash) *types.Bundle { + p.mu.RUnlock() + defer p.mu.RUnlock() + + return p.bundles[hash] +} + +func (p *BundlePool) PruneBundle(hash common.Hash) { + p.mu.Lock() + defer p.mu.Unlock() + p.deleteBundle(hash) +} + +func (p *BundlePool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { + p.mu.Lock() + defer p.mu.Unlock() + + ret := make([]*types.Bundle, 0) + for hash, bundle := range p.bundles { + // Prune outdated bundles + if (bundle.MaxTimestamp != 0 && blockTimestamp > bundle.MaxTimestamp) || + (bundle.MaxBlockNumber != 0 && blockNumber > bundle.MaxBlockNumber) { + p.deleteBundle(hash) + continue + } + + // Roll over future bundles + if bundle.MinTimestamp != 0 && blockTimestamp < bundle.MinTimestamp { + continue + } + + // return the ones that are in time + ret = append(ret, bundle) + } + + bundleGauge.Update(int64(len(p.bundles))) + slotsGauge.Update(int64(p.slots)) + return ret +} + +// AllBundles returns all the bundles currently in the pool +func (p *BundlePool) AllBundles() []*types.Bundle { + p.mu.RLock() + defer p.mu.RUnlock() + bundles := make([]*types.Bundle, 0, len(p.bundles)) + for _, bundle := range p.bundles { + bundles = append(bundles, bundle) + } + return bundles +} + +func (p *BundlePool) Filter(tx *types.Transaction) bool { + return false +} + +func (p *BundlePool) Close() error { + log.Info("Bundle pool stopped") + return nil +} + +func (p *BundlePool) Reset(oldHead, newHead *types.Header) { + p.reset(newHead) +} + +// SetGasTip updates the minimum price required by the subpool for a new +// transaction, and drops all transactions below this threshold. +func (p *BundlePool) SetGasTip(tip *big.Int) {} + +// Has returns an indicator whether subpool has a transaction cached with the +// given hash. +func (p *BundlePool) Has(hash common.Hash) bool { + return false +} + +// Get returns a transaction if it is contained in the pool, or nil otherwise. +func (p *BundlePool) Get(hash common.Hash) *types.Transaction { + return nil +} + +// Add enqueues a batch of transactions into the pool if they are valid. Due +// to the large transaction churn, add may postpone fully integrating the tx +// to a later point to batch multiple ones together. +func (p *BundlePool) Add(txs []*types.Transaction, local bool, sync bool) []error { + return nil +} + +// Pending retrieves all currently processable transactions, grouped by origin +// account and sorted by nonce. +func (p *BundlePool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { + return nil +} + +// SubscribeTransactions subscribes to new transaction events. +func (p *BundlePool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { + return nil +} + +// SubscribeReannoTxsEvent should return an event subscription of +// ReannoTxsEvent and send events to the given channel. +func (p *BundlePool) SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription { + return nil +} + +// Nonce returns the next nonce of an account, with all transactions executable +// by the pool already applied on topool. +func (p *BundlePool) Nonce(addr common.Address) uint64 { + return 0 +} + +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. +func (p *BundlePool) Stats() (int, int) { + return 0, 0 +} + +// Content retrieves the data content of the transaction pool, returning all the +// pending as well as queued transactions, grouped by account and sorted by nonce. +func (p *BundlePool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { + return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction) +} + +// ContentFrom retrieves the data content of the transaction pool, returning the +// pending as well as queued transactions of this address, grouped by nonce. +func (p *BundlePool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { + return []*types.Transaction{}, []*types.Transaction{} +} + +// Locals retrieves the accounts currently considered local by the pool. +func (p *BundlePool) Locals() []common.Address { + return []common.Address{} +} + +// Status returns the known status (unknown/pending/queued) of a transaction +// identified by their hashes. +func (p *BundlePool) Status(hash common.Hash) txpool.TxStatus { + return txpool.TxStatusUnknown +} + +func (p *BundlePool) filter(tx *types.Transaction) bool { + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType: + return true + default: + return false + } +} + +func (p *BundlePool) reset(newHead *types.Header) { + p.mu.Lock() + defer p.mu.Unlock() + + // Prune outdated bundles and invalid bundles + block := p.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + txSet := mapset.NewSet[common.Hash]() + if block != nil { + txs := block.Transactions() + for _, tx := range txs { + txSet.Add(tx.Hash()) + } + } + for hash, bundle := range p.bundles { + if (bundle.MaxTimestamp != 0 && newHead.Time > bundle.MaxTimestamp) || + (bundle.MaxBlockNumber != 0 && newHead.Number.Cmp(new(big.Int).SetUint64(bundle.MaxBlockNumber)) > 0) { + p.slots -= numSlots(p.bundles[hash]) + delete(p.bundles, hash) + } else if txSet.Contains(bundle.Txs[0].Hash()) { + p.slots -= numSlots(p.bundles[hash]) + delete(p.bundles, hash) + } + } + bundleGauge.Update(int64(len(p.bundles))) + slotsGauge.Update(int64(p.slots)) +} + +// deleteBundle deletes a bundle from the pool. +// It assumes that the caller holds the pool's lock. +func (p *BundlePool) deleteBundle(hash common.Hash) { + if p.bundles[hash] == nil { + return + } + + p.slots -= numSlots(p.bundles[hash]) + delete(p.bundles, hash) +} + +// drop removes the bundle with the lowest gas price from the pool. +func (p *BundlePool) drop() { + for len(p.bundleHeap) > 0 { + // Pop the bundle with the lowest gas price + // the min element in the heap may not exist in the pool as it may be pruned + leastPriceBundleHash := heap.Pop(&p.bundleHeap).(*types.Bundle).Hash() + if _, ok := p.bundles[leastPriceBundleHash]; ok { + p.deleteBundle(leastPriceBundleHash) + break + } + } +} + +// minimalBundleGasPrice return the lowest gas price from the pool. +func (p *BundlePool) minimalBundleGasPrice() *big.Int { + for len(p.bundleHeap) != 0 { + leastPriceBundleHash := p.bundleHeap[0].Hash() + if bundle, ok := p.bundles[leastPriceBundleHash]; ok { + return bundle.Price + } + heap.Pop(&p.bundleHeap) + } + return new(big.Int) +} + +func (p *BundlePool) SetMaxGas(maxGas uint64) {} + +// ===================================================================================================================== + +// numSlots calculates the number of slots needed for a single bundle. +func numSlots(bundle *types.Bundle) uint64 { + return (bundle.Size() + bundleSlotSize - 1) / bundleSlotSize +} + +// ===================================================================================================================== + +type BundleHeap []*types.Bundle + +func (h *BundleHeap) Len() int { return len(*h) } + +func (h *BundleHeap) Less(i, j int) bool { + return (*h)[i].Price.Cmp((*h)[j].Price) == -1 +} + +func (h *BundleHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *BundleHeap) Push(x interface{}) { + *h = append(*h, x.(*types.Bundle)) +} + +func (h *BundleHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/core/txpool/bundlepool/config.go b/core/txpool/bundlepool/config.go new file mode 100644 index 0000000000..d99e37281f --- /dev/null +++ b/core/txpool/bundlepool/config.go @@ -0,0 +1,25 @@ +package bundlepool + +import ( + "github.com/ethereum/go-ethereum/log" +) + +type Config struct { + GlobalSlots uint64 // Maximum number of bundle slots for all accounts +} + +// DefaultConfig contains the default configurations for the bundle pool. +var DefaultConfig = Config{ + GlobalSlots: 4096, +} + +// sanitize checks the provided user configurations and changes anything that's +// unreasonable or unworkable. +func (config *Config) sanitize() Config { + conf := *config + if conf.GlobalSlots < 1 { + log.Warn("Sanitizing invalid bundlepool bundle slots", "provided", conf.GlobalSlots, "updated", DefaultConfig.GlobalSlots) + conf.GlobalSlots = DefaultConfig.GlobalSlots + } + return conf +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index e4a0d1e941..d0a877ab96 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -142,3 +142,21 @@ type SubPool interface { // identified by their hashes. Status(hash common.Hash) TxStatus } + +type BundleSubpool interface { + // FilterBundle is a selector used to decide whether a bundle would be added + // to this particular subpool. + FilterBundle(bundle *types.Bundle) bool + + // AddBundle enqueues a bundle into the pool if it is valid. + AddBundle(bundle *types.Bundle, originBundle *types.SendBundleArgs) error + + // PendingBundles retrieves all currently processable bundles. + PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle + + // AllBundles returns all the bundles currently in the pool. + AllBundles() []*types.Bundle + + // PruneBundle removes a bundle from the pool. + PruneBundle(hash common.Hash) +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index a92a4b6355..4eca01f9b4 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -307,6 +307,49 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { return errs } +// AddBundle enqueues a bundle into the pool if it is valid. +func (p *TxPool) AddBundle(bundle *types.Bundle, originBundle *types.SendBundleArgs) error { + // Try to find a sub pool that accepts the bundle + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + if bundleSubpool.FilterBundle(bundle) { + return bundleSubpool.AddBundle(bundle, originBundle) + } + } + } + return errors.New("no subpool accepts the bundle") +} + +// PruneBundle removes a bundle from the pool. +func (p *TxPool) PruneBundle(hash common.Hash) { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + bundleSubpool.PruneBundle(hash) + return // Only one subpool can have the bundle + } + } +} + +// PendingBundles retrieves all currently processable bundles. +func (p *TxPool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + return bundleSubpool.PendingBundles(blockNumber, blockTimestamp) + } + } + return nil +} + +// AllBundles returns all the bundles currently in the pool +func (p *TxPool) AllBundles() []*types.Bundle { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + return bundleSubpool.AllBundles() + } + } + return nil +} + // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction { @@ -322,9 +365,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { - subs := make([]event.Subscription, len(p.subpools)) - for i, subpool := range p.subpools { - subs[i] = subpool.SubscribeTransactions(ch, reorgs) + subs := make([]event.Subscription, 0, len(p.subpools)) + for _, subpool := range p.subpools { + sub := subpool.SubscribeTransactions(ch, reorgs) + if sub != nil { // sub will be nil when subpool have been shut down + subs = append(subs, sub) + } } return p.subs.Track(event.JoinSubscriptions(subs...)) } diff --git a/core/types/bundle.go b/core/types/bundle.go new file mode 100644 index 0000000000..5eb0922b9c --- /dev/null +++ b/core/types/bundle.go @@ -0,0 +1,71 @@ +package types + +import ( + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + // MaxBundleAliveBlock is the max alive block for bundle + MaxBundleAliveBlock = 60 + // MaxBundleAliveTime is the max alive time for bundle + MaxBundleAliveTime = 60 // second +) + +// SendBundleArgs represents the arguments for a call. +type SendBundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + MaxBlockNumber uint64 `json:"maxBlockNumber"` + MinTimestamp *uint64 `json:"minTimestamp"` + MaxTimestamp *uint64 `json:"maxTimestamp"` + RevertingTxHashes []common.Hash `json:"revertingTxHashes"` +} + +type Bundle struct { + Txs Transactions + MaxBlockNumber uint64 + MinTimestamp uint64 + MaxTimestamp uint64 + RevertingTxHashes []common.Hash + + Price *big.Int // for bundle compare and prune + + // caches + hash atomic.Value + size atomic.Value +} + +type SimulatedBundle struct { + OriginalBundle *Bundle + + BundleGasFees *big.Int + BundleGasPrice *big.Int + BundleGasUsed uint64 +} + +func (bundle *Bundle) Size() uint64 { + if size := bundle.size.Load(); size != nil { + return size.(uint64) + } + c := writeCounter(0) + rlp.Encode(&c, bundle) + + size := uint64(c) + bundle.size.Store(size) + return size +} + +// Hash returns the bundle hash. +func (bundle *Bundle) Hash() common.Hash { + if hash := bundle.hash.Load(); hash != nil { + return hash.(common.Hash) + } + + h := rlpHash(bundle) + bundle.hash.Store(h) + return h +} diff --git a/core/types/bundle_gasless.go b/core/types/bundle_gasless.go new file mode 100644 index 0000000000..3be28ebf1e --- /dev/null +++ b/core/types/bundle_gasless.go @@ -0,0 +1,20 @@ +package types + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type SimulateGaslessBundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` +} + +type GaslessTxSimResult struct { + Hash common.Hash + GasUsed uint64 +} + +type SimulateGaslessBundleResp struct { + ValidResults []GaslessTxSimResult + BasedBlockNumber int64 +} diff --git a/core/types/receipt.go b/core/types/receipt.go index 1b7d03459a..12dcf4c2cf 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -580,6 +580,9 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu } rs[i].L1GasPrice = l1BaseFee rs[i].L1Fee, rs[i].L1GasUsed = costFunc(txs[i].RollupCostData()) + if txs[i].GasPrice().Cmp(big.NewInt(0)) == 0 && config.IsWright(time) { + rs[i].L1Fee = big.NewInt(0) + } rs[i].FeeScalar = feeScalar } } diff --git a/eth/api_backend.go b/eth/api_backend.go index ee6b2db9b8..7aef569c34 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/big" + "sort" "time" "github.com/ethereum/go-ethereum" @@ -323,6 +324,35 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] } +func (b *EthAPIBackend) SendBundle(ctx context.Context, bundle *types.Bundle, originBundle *types.SendBundleArgs) error { + return b.eth.txPool.AddBundle(bundle, originBundle) +} + +func (b *EthAPIBackend) SimulateGaslessBundle(bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { + return b.Miner().SimulateGaslessBundle(bundle) +} + +func (b *EthAPIBackend) BundlePrice() *big.Int { + bundles := b.eth.txPool.AllBundles() + gasFloor := big.NewInt(b.eth.config.Miner.Mev.MevBundleGasPriceFloor) + + if len(bundles) == 0 { + return gasFloor + } + + sort.SliceStable(bundles, func(i, j int) bool { + return bundles[j].Price.Cmp(bundles[i].Price) < 0 + }) + + idx := len(bundles) / 2 + + if bundles[idx] == nil || bundles[idx].Price.Cmp(gasFloor) < 0 { + return gasFloor + } + + return bundles[idx].Price +} + func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions diff --git a/eth/backend.go b/eth/backend.go index d5b59984fa..f2d02fca67 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math/big" "runtime" "sync" @@ -292,6 +293,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { blobPool := blobpool.New(config.BlobPool, eth.blockchain) txPools = append(txPools, blobPool) } + bundlePool := &bundlepool.BundlePool{} + if config.Miner.Mev.MevEnabled { + bundlePool = bundlepool.New(config.BundlePool, config.Miner.Mev, eth.blockchain) + txPools = append(txPools, bundlePool) + } eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, txPools) if err != nil { return nil, err @@ -315,6 +321,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner = miner.New(eth, &config.Miner, eth.blockchain.Config(), eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) + if config.Miner.Mev.MevEnabled { + bundlePool.SetBundleSimulator(eth.miner) + } eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, config.RollupDisableTxPoolAdmission, eth, nil} if eth.APIBackend.allowUnprotectedTxs { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 51ff72c6cc..0da45c08ed 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -19,6 +19,7 @@ package ethconfig import ( "errors" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "time" "github.com/ethereum/go-ethereum/common" @@ -170,8 +171,9 @@ type Config struct { Miner miner.Config // Transaction pool options - TxPool legacypool.Config - BlobPool blobpool.Config + TxPool legacypool.Config + BlobPool blobpool.Config + BundlePool bundlepool.Config // Gas Price Oracle options GPO gasprice.Config diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index d70729d7c6..3a1fba387a 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool/blobpool" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/gasprice" @@ -31,6 +32,8 @@ func (c Config) MarshalTOML() (interface{}, error) { StateScheme string `toml:",omitempty"` PathNodeBuffer pathdb.NodeBufferType `toml:",omitempty"` ProposeBlockInterval uint64 `toml:",omitempty"` + EnableProofKeeper bool `toml:",omitempty"` + KeepProofBlockSpan uint64 `toml:",omitempty"` JournalFileEnabled bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ int `toml:",omitempty"` @@ -54,6 +57,7 @@ func (c Config) MarshalTOML() (interface{}, error) { Miner miner.Config TxPool legacypool.Config BlobPool blobpool.Config + BundlePool bundlepool.Config GPO gasprice.Config EnablePreimageRecording bool DocRoot string `toml:"-"` @@ -88,6 +92,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.StateScheme = c.StateScheme enc.PathNodeBuffer = c.PathNodeBuffer enc.ProposeBlockInterval = c.ProposeBlockInterval + enc.EnableProofKeeper = c.EnableProofKeeper + enc.KeepProofBlockSpan = c.KeepProofBlockSpan enc.JournalFileEnabled = c.JournalFileEnabled enc.RequiredBlocks = c.RequiredBlocks enc.LightServ = c.LightServ @@ -111,6 +117,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.Miner = c.Miner enc.TxPool = c.TxPool enc.BlobPool = c.BlobPool + enc.BundlePool = c.BundlePool enc.GPO = c.GPO enc.EnablePreimageRecording = c.EnablePreimageRecording enc.DocRoot = c.DocRoot @@ -149,6 +156,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { StateScheme *string `toml:",omitempty"` PathNodeBuffer *pathdb.NodeBufferType `toml:",omitempty"` ProposeBlockInterval *uint64 `toml:",omitempty"` + EnableProofKeeper *bool `toml:",omitempty"` + KeepProofBlockSpan *uint64 `toml:",omitempty"` JournalFileEnabled *bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ *int `toml:",omitempty"` @@ -172,6 +181,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { Miner *miner.Config TxPool *legacypool.Config BlobPool *blobpool.Config + BundlePool *bundlepool.Config GPO *gasprice.Config EnablePreimageRecording *bool DocRoot *string `toml:"-"` @@ -235,6 +245,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.ProposeBlockInterval != nil { c.ProposeBlockInterval = *dec.ProposeBlockInterval } + if dec.EnableProofKeeper != nil { + c.EnableProofKeeper = *dec.EnableProofKeeper + } + if dec.KeepProofBlockSpan != nil { + c.KeepProofBlockSpan = *dec.KeepProofBlockSpan + } if dec.JournalFileEnabled != nil { c.JournalFileEnabled = *dec.JournalFileEnabled } @@ -304,6 +320,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.BlobPool != nil { c.BlobPool = *dec.BlobPool } + if dec.BundlePool != nil { + c.BundlePool = *dec.BundlePool + } if dec.GPO != nil { c.GPO = *dec.GPO } diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index e8a201f71b..d01aab51f1 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -667,6 +667,36 @@ func toCallArg(msg ethereum.CallMsg) interface{} { return arg } +// SimulateGaslessBundle simulates a gasless bundle +func (ec *Client) SimulateGaslessBundle(ctx context.Context, args types.SimulateGaslessBundleArgs) (*types.SimulateGaslessBundleResp, error) { + var bundle types.SimulateGaslessBundleResp + err := ec.c.CallContext(ctx, &bundle, "eth_simulateGaslessBundle", args) + if err != nil { + return nil, err + } + return &bundle, nil +} + +// SendBundle sends a bundle +func (ec *Client) SendBundle(ctx context.Context, args types.SendBundleArgs) (common.Hash, error) { + var hash common.Hash + err := ec.c.CallContext(ctx, &hash, "eth_sendBundle", args) + if err != nil { + return common.Hash{}, err + } + return hash, nil +} + +// BundlePrice returns the price of a bundle +func (ec *Client) BundlePrice(ctx context.Context) (*big.Int, error) { + var price *big.Int + err := ec.c.CallContext(ctx, &price, "eth_bundlePrice") + if err != nil { + return nil, err + } + return price, nil +} + // rpcProgress is a copy of SyncProgress with hex-encoded fields. type rpcProgress struct { StartingBlock hexutil.Uint64 diff --git a/internal/ethapi/api_bundle.go b/internal/ethapi/api_bundle.go new file mode 100644 index 0000000000..d291dc2083 --- /dev/null +++ b/internal/ethapi/api_bundle.go @@ -0,0 +1,150 @@ +package ethapi + +import ( + "context" + "errors" + "fmt" + "github.com/ethereum/go-ethereum/log" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const InvalidBundleParamError = -38000 + +// PrivateTxBundleAPI offers an API for accepting bundled transactions +type PrivateTxBundleAPI struct { + b Backend +} + +// NewPrivateTxBundleAPI creates a new Tx Bundle API instance. +func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { + return &PrivateTxBundleAPI{b} +} + +func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) *big.Int { + return s.b.BundlePrice() +} + +// SimulateGaslessBundle simulates the execution of a list of transactions with order +func (s *PrivateTxBundleAPI) SimulateGaslessBundle(_ context.Context, args types.SimulateGaslessBundleArgs) (*types.SimulateGaslessBundleResp, error) { + if len(args.Txs) == 0 { + return nil, newBundleError(errors.New("bundle missing txs")) + } + + var txs types.Transactions + + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + log.Error("failed to unmarshal gasless tx", "err", err) + continue + } + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType: + default: + log.Error("transaction type not supported", "txType", tx.Type()) + continue + } + txs = append(txs, tx) + } + + bundle := &types.Bundle{ + Txs: txs, + } + + return s.b.SimulateGaslessBundle(bundle) +} + +// SendBundle will add the signed transaction to the transaction pool. +// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity +func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args types.SendBundleArgs) (common.Hash, error) { + if len(args.Txs) == 0 { + return common.Hash{}, newBundleError(errors.New("bundle missing txs")) + } + + currentHeader := s.b.CurrentHeader() + + if args.MaxBlockNumber == 0 && (args.MaxTimestamp == nil || *args.MaxTimestamp == 0) { + maxTimeStamp := currentHeader.Time + types.MaxBundleAliveTime + args.MaxTimestamp = &maxTimeStamp + } + + if args.MaxBlockNumber != 0 && args.MaxBlockNumber > currentHeader.Number.Uint64()+types.MaxBundleAliveBlock { + return common.Hash{}, newBundleError(errors.New(fmt.Sprintf("the maxBlockNumber should not be lager than currentBlockNum + %d", types.MaxBundleAliveBlock))) + } + + if args.MaxTimestamp != nil && args.MinTimestamp != nil && *args.MaxTimestamp != 0 && *args.MinTimestamp != 0 { + if *args.MaxTimestamp <= *args.MinTimestamp { + return common.Hash{}, newBundleError(errors.New("the maxTimestamp should not be less than minTimestamp")) + } + } + + if args.MaxTimestamp != nil && *args.MaxTimestamp != 0 && *args.MaxTimestamp < currentHeader.Time { + return common.Hash{}, newBundleError(errors.New("the maxTimestamp should not be less than currentBlockTimestamp")) + } + + if (args.MaxTimestamp != nil && *args.MaxTimestamp > currentHeader.Time+types.MaxBundleAliveTime) || + (args.MinTimestamp != nil && *args.MinTimestamp > currentHeader.Time+types.MaxBundleAliveTime) { + return common.Hash{}, newBundleError(errors.New(fmt.Sprintf("the minTimestamp/maxTimestamp should not be later than currentBlockTimestamp + %d seconds", types.MaxBundleAliveTime))) + } + + var txs types.Transactions + + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return common.Hash{}, err + } + txs = append(txs, tx) + } + + var minTimestamp, maxTimestamp uint64 + + if args.MinTimestamp != nil { + minTimestamp = *args.MinTimestamp + } + + if args.MaxTimestamp != nil { + maxTimestamp = *args.MaxTimestamp + } + + bundle := &types.Bundle{ + Txs: txs, + MaxBlockNumber: args.MaxBlockNumber, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + RevertingTxHashes: args.RevertingTxHashes, + } + + // If the maxBlockNumber and maxTimestamp are not set, set max ddl of bundle as types.MaxBundleAliveBlock + if bundle.MaxBlockNumber == 0 && bundle.MaxTimestamp == 0 { + bundle.MaxBlockNumber = currentHeader.Number.Uint64() + types.MaxBundleAliveBlock + } + + err := s.b.SendBundle(ctx, bundle, &args) + if err != nil { + return common.Hash{}, err + } + + return bundle.Hash(), nil +} + +func newBundleError(err error) *bundleError { + return &bundleError{ + error: err, + } +} + +// bundleError is an API error that encompasses an invalid bundle with JSON error +// code and a binary data blob. +type bundleError struct { + error +} + +// ErrorCode returns the JSON error code for an invalid bundle. +// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal +func (e *bundleError) ErrorCode() int { + return InvalidBundleParamError +} diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 505de06205..b1ffd68685 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -722,6 +722,15 @@ func (b testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) even func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { panic("implement me") } +func (b testBackend) SimulateGaslessBundle(bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { + panic("implement me") +} +func (b testBackend) SendBundle(ctx context.Context, bundle *types.Bundle, originBundle *types.SendBundleArgs) error { + panic("implement me") +} +func (b testBackend) BundlePrice() *big.Int { + panic("implement me") +} func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash) return tx, blockHash, blockNumber, index, nil diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 7da74f1fc9..f758251c12 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -77,6 +77,9 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error + SimulateGaslessBundle(bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) + SendBundle(ctx context.Context, bundle *types.Bundle, originBundle *types.SendBundleArgs) error + BundlePrice() *big.Int GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction @@ -127,6 +130,9 @@ func GetAPIs(apiBackend Backend) []rpc.API { }, { Namespace: "personal", Service: NewPersonalAccountAPI(apiBackend, nonceLock), + }, { + Namespace: "eth", + Service: NewPrivateTxBundleAPI(apiBackend), }, } } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 85006a073d..ad010d9ae8 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -383,6 +383,13 @@ func (b *backendMock) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) eve return nil } func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil } +func (b *backendMock) SimulateGaslessBundle(bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { + panic("implement me") +} +func (b *backendMock) SendBundle(ctx context.Context, bundle *types.Bundle, originBundle *types.SendBundleArgs) error { + return nil +} +func (b *backendMock) BundlePrice() *big.Int { return nil } func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { return nil, [32]byte{}, 0, 0, nil } diff --git a/internal/flags/categories.go b/internal/flags/categories.go index fe2e6d29d4..0cf30d4b1f 100644 --- a/internal/flags/categories.go +++ b/internal/flags/categories.go @@ -24,6 +24,7 @@ const ( DevCategory = "DEVELOPER CHAIN" StateCategory = "STATE HISTORY MANAGEMENT" TxPoolCategory = "TRANSACTION POOL (EVM)" + BundlePoolCategory = "TRANSACTION POOL (BUNDLE)" BlobPoolCategory = "TRANSACTION POOL (BLOB)" PerfCategory = "PERFORMANCE TUNING" AccountCategory = "ACCOUNT" @@ -38,6 +39,7 @@ const ( MiscCategory = "MISC" TestingCategory = "TESTING" DeprecatedCategory = "ALIASED (deprecated)" + MEVCategory = "MEV" ) func init() { diff --git a/miner/bundle_cache.go b/miner/bundle_cache.go new file mode 100644 index 0000000000..54609682c5 --- /dev/null +++ b/miner/bundle_cache.go @@ -0,0 +1,88 @@ +package miner + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const ( + maxHeaders = 3 +) + +type BundleCache struct { + mu sync.Mutex + entries []*BundleCacheEntry +} + +func NewBundleCache() *BundleCache { + return &BundleCache{ + entries: make([]*BundleCacheEntry, maxHeaders), + } +} + +func (b *BundleCache) GetBundleCache(header common.Hash) *BundleCacheEntry { + b.mu.Lock() + defer b.mu.Unlock() + + for _, entry := range b.entries { + if entry != nil && entry.headerHash == header { + return entry + } + } + newEntry := newCacheEntry(header) + b.entries = b.entries[1:] + b.entries = append(b.entries, newEntry) + + return newEntry +} + +type BundleCacheEntry struct { + mu sync.Mutex + headerHash common.Hash + successfulBundles map[common.Hash]*types.SimulatedBundle + failedBundles map[common.Hash]struct{} +} + +func newCacheEntry(header common.Hash) *BundleCacheEntry { + return &BundleCacheEntry{ + headerHash: header, + successfulBundles: make(map[common.Hash]*types.SimulatedBundle), + failedBundles: make(map[common.Hash]struct{}), + } +} + +func (c *BundleCacheEntry) GetSimulatedBundle(bundle common.Hash) (*types.SimulatedBundle, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if simmed, ok := c.successfulBundles[bundle]; ok { + return simmed, true + } + + if _, ok := c.failedBundles[bundle]; ok { + return nil, true + } + + return nil, false +} + +func (c *BundleCacheEntry) UpdateSimulatedBundles(result map[common.Hash]*types.SimulatedBundle, bundles []*types.Bundle) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, bundle := range bundles { + if bundle == nil { + continue + } + + bundleHash := bundle.Hash() + + if result[bundleHash] != nil { + c.successfulBundles[bundleHash] = result[bundleHash] + } else { + c.failedBundles[bundleHash] = struct{}{} + } + } +} diff --git a/miner/miner.go b/miner/miner.go index 5c0ba3f815..2174d6ca81 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -19,7 +19,10 @@ package miner import ( "context" + "errors" "fmt" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "math/big" "sync" "time" @@ -41,10 +44,10 @@ import ( var ( commitDepositTxsTimer = metrics.NewRegisteredTimer("miner/commit/deposit/txs", nil) - packFromTxpoolTimer = metrics.NewRegisteredTimer("miner/pack/txpool/txs", nil) - commitTxpoolTxsTimer = metrics.NewRegisteredTimer("miner/commit/txpool/txs", nil) - assembleBlockTimer = metrics.NewRegisteredTimer("miner/assemble/block", nil) - buildBlockTimer = metrics.NewRegisteredTimer("miner/build/block", nil) + packFromTxpoolTimer = metrics.NewRegisteredTimer("miner/pack/txpool/txs", nil) + commitTxpoolTxsTimer = metrics.NewRegisteredTimer("miner/commit/txpool/txs", nil) + assembleBlockTimer = metrics.NewRegisteredTimer("miner/assemble/block", nil) + buildBlockTimer = metrics.NewRegisteredTimer("miner/build/block", nil) accountReadTimer = metrics.NewRegisteredTimer("miner/account/reads", nil) accountHashTimer = metrics.NewRegisteredTimer("miner/account/hashes", nil) @@ -64,6 +67,20 @@ var ( isBuildBlockInterruptCounter = metrics.NewRegisteredCounter("miner/build/interrupt", nil) ) +var defaultCoinBaseAddress = common.HexToAddress("0x4200000000000000000000000000000000000011") + +type MevConfig struct { + MevEnabled bool // Whether to enable Mev or not + MevReceivers []string // The list of Mev bundle receivers + MevBundleGasPriceFloor int64 // The minimal bundle gas Price +} + +var DefaultMevConfig = MevConfig{ + MevEnabled: false, + MevReceivers: nil, + MevBundleGasPriceFloor: 1, +} + // Backend wraps all methods required for mining. Only full node is capable // to offer all the functions here. type Backend interface { @@ -87,6 +104,8 @@ type Config struct { NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload RollupComputePendingBlock bool // Compute the pending block from tx-pool, instead of copying the latest-block + + Mev MevConfig // Mev configuration } // DefaultConfig contains default settings for miner. @@ -100,6 +119,8 @@ var DefaultConfig = Config{ // run 3 rounds. Recommit: 2 * time.Second, NewPayloadTimeout: 2 * time.Second, + + Mev: DefaultMevConfig, } // Miner creates blocks and searches for proof-of-work values. @@ -278,3 +299,91 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { return miner.worker.buildPayload(args) } + +func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) { + + env, err := miner.prepareSimulationEnv() + if err != nil { + return nil, err + } + + s, err := miner.worker.simulateBundles(env, []*types.Bundle{bundle}) + if err != nil { + return nil, err + } + + if len(s) == 0 { + return nil, errors.New("no valid sim result") + } + + return s[0].BundleGasPrice, nil +} + +func (miner *Miner) SimulateGaslessBundle(bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { + + env, err := miner.prepareSimulationEnv() + if err != nil { + return nil, err + } + + resp, err := miner.worker.simulateGaslessBundle(env, bundle) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (miner *Miner) prepareSimulationEnv() (*environment, error) { + parent := miner.eth.BlockChain().CurrentBlock() + timestamp := int64(parent.Time + 1) + + header := &types.Header{ + ParentHash: parent.Hash(), + Number: new(big.Int).Add(parent.Number, common.Big1), + GasLimit: core.CalcGasLimit(parent.GasLimit, miner.worker.config.GasCeil), + Time: uint64(timestamp), + Coinbase: defaultCoinBaseAddress, + } + + // Set baseFee and GasLimit if we are on an EIP-1559 chain + if miner.worker.chainConfig.IsLondon(header.Number) { + header.BaseFee = eip1559.CalcBaseFee(miner.worker.chainConfig, parent, header.Time) + } + + if miner.worker.chainConfig.Optimism != nil && miner.worker.config.GasCeil != 0 { + // configure the gas limit of pending blocks with the miner gas limit config when using optimism + header.GasLimit = miner.worker.config.GasCeil + } + + // Apply EIP-4844, EIP-4788. + if miner.worker.chainConfig.IsCancun(header.Number, header.Time) { + var excessBlobGas uint64 + if miner.worker.chainConfig.IsCancun(parent.Number, parent.Time) { + excessBlobGas = eip4844.CalcExcessBlobGas(*parent.ExcessBlobGas, *parent.BlobGasUsed) + } else { + // For the first post-fork block, both parent.data_gas_used and parent.excess_data_gas are evaluated as 0 + excessBlobGas = eip4844.CalcExcessBlobGas(0, 0) + } + header.BlobGasUsed = new(uint64) + header.ExcessBlobGas = &excessBlobGas + } + + if err := miner.worker.engine.Prepare(miner.eth.BlockChain(), header); err != nil { + log.Error("Failed to prepare header for simulateBundle", "err", err) + return nil, err + } + + state, err := miner.eth.BlockChain().StateAt(parent.Root) + if err != nil { + return nil, err + } + + env := &environment{ + header: header, + state: state.Copy(), + signer: types.MakeSigner(miner.worker.chainConfig, header.Number, header.Time), + gasPool: prepareGasPool(), + } + return env, nil +} diff --git a/miner/worker.go b/miner/worker.go index fade5b084f..292a2e9ad0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + mapset "github.com/deckarep/golang-set/v2" "math/big" "sync" "sync/atomic" @@ -78,10 +79,12 @@ const ( ) var ( - errBlockInterruptedByNewHead = errors.New("new head arrived while building block") - errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") - errBlockInterruptedByTimeout = errors.New("timeout while building block") - errBlockInterruptedByResolve = errors.New("payload resolution while building block") + errBlockInterruptedByNewHead = errors.New("new head arrived while building block") + errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") + errBlockInterruptedByTimeout = errors.New("timeout while building block") + errBlockInterruptedByResolve = errors.New("payload resolution while building block") + errBlockInterruptedByBundleCommit = errors.New("failed bundle commit while building block") + errFillBundleInterrupted = errors.New("fill bundle interrupted") ) var ( @@ -109,6 +112,8 @@ type environment struct { receipts []*types.Receipt sidecars []*types.BlobTxSidecar blobs int + + UnRevertible mapset.Set[common.Hash] } // copy creates a deep copy of environment. @@ -158,6 +163,7 @@ const ( commitInterruptResubmit commitInterruptTimeout commitInterruptResolve + commitInterruptBundleCommit ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. @@ -256,6 +262,9 @@ type worker struct { skipSealHook func(*task) bool // Method to decide whether skipping the sealing. fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. + + // MEV + bundleCache *BundleCache } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { @@ -280,6 +289,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus exitCh: make(chan struct{}), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + bundleCache: NewBundleCache(), } // Subscribe for transaction insertion events (whether from network or resurrects) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) @@ -822,6 +832,22 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } +func (w *worker) commitBundleTransaction(env *environment, tx *types.Transaction, unRevertible bool) ([]*types.Log, error) { + if tx.Type() == types.BlobTxType { + return w.commitBlobTransaction(env, tx) + } + receipt, err := w.applyTransaction(env, tx) + if err != nil { + return nil, err + } + if receipt.Status == types.ReceiptStatusFailed && unRevertible { + return nil, errors.New("no revertible transaction failed") + } + env.txs = append(env.txs, tx) + env.receipts = append(env.receipts, receipt) + return receipt.Logs, nil +} + func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { sc := tx.BlobTxSidecar() if sc == nil { @@ -1192,17 +1218,41 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { timer := time.AfterFunc(w.newpayloadTimeout, func() { interrupt.Store(commitInterruptTimeout) }) - - err := w.fillTransactions(interrupt, work) - timer.Stop() // don't need timeout interruption any more - if errors.Is(err, errBlockInterruptedByTimeout) { - log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout), "parentHash", genParams.parentHash) - isBuildBlockInterruptCounter.Inc(1) - } else if errors.Is(err, errBlockInterruptedByResolve) { - log.Info("Block building got interrupted by payload resolution", "parentHash", genParams.parentHash) - isBuildBlockInterruptCounter.Inc(1) + if w.config.Mev.MevEnabled { + newWork := work.copy() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.fillTransactions(interrupt, newWork) + if errors.Is(err, errBlockInterruptedByTimeout) { + log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout), "parentHash", genParams.parentHash) + isBuildBlockInterruptCounter.Inc(1) + } else if errors.Is(err, errBlockInterruptedByResolve) { + log.Info("Block building got interrupted by payload resolution", "parentHash", genParams.parentHash) + isBuildBlockInterruptCounter.Inc(1) + } + }() + err := w.fillTransactionsAndBundles(interrupt, work) + wg.Wait() + timer.Stop() // don't need timeout interruption any more + if errors.Is(err, errFillBundleInterrupted) { + log.Warn("fill bundles is interrupted, discard", "err", err) + work = newWork + } + } else { + err := w.fillTransactions(interrupt, work) + timer.Stop() // don't need timeout interruption any more + if errors.Is(err, errBlockInterruptedByTimeout) { + log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout), "parentHash", genParams.parentHash) + isBuildBlockInterruptCounter.Inc(1) + } else if errors.Is(err, errBlockInterruptedByResolve) { + log.Info("Block building got interrupted by payload resolution", "parentHash", genParams.parentHash) + isBuildBlockInterruptCounter.Inc(1) + } } } + if intr := genParams.interrupt; intr != nil && genParams.isUpdate && intr.Load() != commitInterruptNone { return &newPayloadResult{err: errInterruptedUpdate} } @@ -1231,7 +1281,6 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult { innerExecutionTimer.Update(core.DebugInnerExecutionDuration) log.Debug("build payload statedb metrics", "parentHash", genParams.parentHash, "accountReads", common.PrettyDuration(work.state.AccountReads), "storageReads", common.PrettyDuration(work.state.StorageReads), "snapshotAccountReads", common.PrettyDuration(work.state.SnapshotAccountReads), "snapshotStorageReads", common.PrettyDuration(work.state.SnapshotStorageReads), "accountUpdates", common.PrettyDuration(work.state.AccountUpdates), "storageUpdates", common.PrettyDuration(work.state.StorageUpdates), "accountHashes", common.PrettyDuration(work.state.AccountHashes), "storageHashes", common.PrettyDuration(work.state.StorageHashes)) - return &newPayloadResult{ block: block, fees: totalFees(block, work.receipts), @@ -1407,6 +1456,8 @@ func signalToErr(signal int32) error { return errBlockInterruptedByTimeout case commitInterruptResolve: return errBlockInterruptedByResolve + case commitInterruptBundleCommit: + return errBlockInterruptedByBundleCommit default: panic(fmt.Errorf("undefined signal %d", signal)) } diff --git a/miner/worker_builder.go b/miner/worker_builder.go new file mode 100644 index 0000000000..7f59c8659f --- /dev/null +++ b/miner/worker_builder.go @@ -0,0 +1,450 @@ +package miner + +import ( + "errors" + mapset "github.com/deckarep/golang-set/v2" + "math/big" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +var ( + errNonRevertingTxInBundleFailed = errors.New("non-reverting tx in bundle failed") + errBundlePriceTooLow = errors.New("bundle price too low") +) + +// fillTransactions retrieves the pending bundles and transactions from the txpool and fills them +// into the given sealing block. The selection and ordering strategy can be extended in the future. +func (w *worker) fillTransactionsAndBundles(interrupt *atomic.Int32, env *environment) error { + // TODO will remove after fix txpool perf issue + if interrupt != nil { + if signal := interrupt.Load(); signal != commitInterruptNone { + log.Warn("fill bundles interrupted by signal") + return errFillBundleInterrupted + } + } + + bundles := w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time) + + // if no bundles, not necessary to fill transactions + if len(bundles) == 0 { + log.Warn("no bundles in bundle pool") + return errFillBundleInterrupted + } + + txs, _, err := w.generateOrderedBundles(env, bundles) + if err != nil { + log.Error("fail to generate ordered bundles", "err", err) + return errFillBundleInterrupted + } + + if err = w.commitBundles(env, txs, interrupt); err != nil { + log.Error("fail to commit bundles", "err", err) + return errFillBundleInterrupted + } + log.Info("fill bundles", "bundles_count", len(bundles)) + + start := time.Now() + pending := w.eth.TxPool().Pending(true) + packFromTxpoolTimer.UpdateSince(start) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + + // Split the pending transactions into locals and remotes. + localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending + + // Fill the block with all available pending transactions. + start = time.Now() + if len(localTxs) > 0 { + txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) + if err := w.commitTransactions(env, txs, interrupt); err != nil { + return err + } + } + if len(remoteTxs) > 0 { + txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) + if err := w.commitTransactions(env, txs, interrupt); err != nil { + return err + } + } + commitTxpoolTxsTimer.UpdateSince(start) + log.Debug("commitTxpoolTxsTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Info("fill bundles and transactions done", "total_txs_count", len(env.txs)) + return nil +} + +func (w *worker) commitBundles( + env *environment, + txs types.Transactions, + interrupt *atomic.Int32, +) error { + gasLimit := prepareGasPool() + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit.Gas()) + } + var coalescedLogs []*types.Log + + for _, tx := range txs { + if interrupt != nil { + if signal := interrupt.Load(); signal != commitInterruptNone { + return errors.New("failed bundle commit due to payload timeout or resolve") + } + } + // If we don't have enough gas for any further transactions then we're done. + if env.gasPool.Gas() < params.TxGas { + return errors.New("not enough gas for further transactions") + } + if tx == nil { + return errors.New("unexpected nil transaction in bundle") + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { + return errors.New("unexpected protected transaction in bundle") + } + // Start executing the transaction + env.state.SetTxContext(tx.Hash(), env.tcount) + + logs, err := w.commitBundleTransaction(env, tx, env.UnRevertible.Contains(tx.Hash())) + switch err { + case core.ErrGasLimitReached: + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Error("Unexpected gas limit exceeded for current block in the bundle", "sender", from) + return signalToErr(commitInterruptBundleCommit) + + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Error("Transaction with low nonce in the bundle", "sender", from, "nonce", tx.Nonce()) + return signalToErr(commitInterruptBundleCommit) + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Error("Account with high nonce in the bundle", "sender", from, "nonce", tx.Nonce()) + return signalToErr(commitInterruptBundleCommit) + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + continue + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Error("Transaction failed in the bundle", "hash", tx.Hash(), "err", err) + return signalToErr(commitInterruptBundleCommit) + } + } + + if !w.isRunning() && len(coalescedLogs) > 0 { + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every second. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + return nil +} + +// generateOrderedBundles generates ordered txs from the given bundles. +// 1. sort bundles according to computed gas price when received. +// 2. simulate bundles based on the same state, resort. +// 3. merge resorted simulateBundles based on the iterative state. +func (w *worker) generateOrderedBundles( + env *environment, + bundles []*types.Bundle, +) (types.Transactions, *types.SimulatedBundle, error) { + // sort bundles according to gas price computed when received + slices.SortStableFunc(bundles, func(i, j *types.Bundle) int { + priceI, priceJ := i.Price, j.Price + return priceJ.Cmp(priceI) + }) + + // recompute bundle gas price based on the same state and current env + simulatedBundles, err := w.simulateBundles(env, bundles) + if err != nil { + log.Error("fail to simulate bundles base on the same state", "err", err) + return nil, nil, err + } + + // sort bundles according to fresh gas price + slices.SortStableFunc(simulatedBundles, func(i, j *types.SimulatedBundle) int { + priceI, priceJ := i.BundleGasPrice, j.BundleGasPrice + return priceJ.Cmp(priceI) + }) + + // merge bundles based on iterative state + includedTxs, mergedBundle, err := w.mergeBundles(env, simulatedBundles) + if err != nil { + log.Error("fail to merge bundles", "err", err) + return nil, nil, err + } + + return includedTxs, mergedBundle, nil +} + +func (w *worker) simulateBundles(env *environment, bundles []*types.Bundle) ([]*types.SimulatedBundle, error) { + headerHash := env.header.Hash() + simCache := w.bundleCache.GetBundleCache(headerHash) + simResult := make(map[common.Hash]*types.SimulatedBundle) + + var wg sync.WaitGroup + var mu sync.Mutex + for i, bundle := range bundles { + if simmed, ok := simCache.GetSimulatedBundle(bundle.Hash()); ok { + mu.Lock() + simResult[bundle.Hash()] = simmed + mu.Unlock() + continue + } + + wg.Add(1) + go func(idx int, bundle *types.Bundle, state *state.StateDB) { + defer wg.Done() + gasPool := prepareGasPool() + simmed, err := w.simulateBundle(env, bundle, state, gasPool, 0, true, true) + if err != nil { + log.Trace("Error computing gas for a simulateBundle", "error", err) + return + } + + mu.Lock() + defer mu.Unlock() + simResult[bundle.Hash()] = simmed + }(i, bundle, env.state.Copy()) + } + + wg.Wait() + + simulatedBundles := make([]*types.SimulatedBundle, 0) + + for _, bundle := range simResult { + if bundle == nil { + continue + } + + simulatedBundles = append(simulatedBundles, bundle) + } + + simCache.UpdateSimulatedBundles(simResult, bundles) + + return simulatedBundles, nil +} + +// mergeBundles merges the given simulateBundle into the given environment. +// It returns the merged simulateBundle and the number of transactions that were merged. +func (w *worker) mergeBundles( + env *environment, + bundles []*types.SimulatedBundle, +) (types.Transactions, *types.SimulatedBundle, error) { + currentState := env.state.Copy() + gasPool := prepareGasPool() + env.UnRevertible = mapset.NewSet[common.Hash]() + + includedTxs := types.Transactions{} + mergedBundle := types.SimulatedBundle{ + BundleGasFees: new(big.Int), + BundleGasUsed: 0, + BundleGasPrice: new(big.Int), + } + + for _, bundle := range bundles { + prevState := currentState.Copy() + prevGasPool := new(core.GasPool).AddGas(gasPool.Gas()) + + // the floor gas price is 99/100 what was simulated at the top of the block + floorGasPrice := new(big.Int).Mul(bundle.BundleGasPrice, big.NewInt(99)) + floorGasPrice = floorGasPrice.Div(floorGasPrice, big.NewInt(100)) + + simulatedBundle, err := w.simulateBundle(env, bundle.OriginalBundle, currentState, gasPool, len(includedTxs), true, false) + + if err != nil && errors.Is(err, core.ErrGasLimitReached) { + log.Error("failed to merge bundle, interrupt merge process", "err", err) + break + } + if err != nil || simulatedBundle.BundleGasPrice.Cmp(floorGasPrice) <= 0 { + currentState = prevState + gasPool = prevGasPool + + log.Error("failed to merge bundle", "floorGasPrice", floorGasPrice, "err", err) + continue + } + + log.Info("included bundle", + "gasUsed", simulatedBundle.BundleGasUsed, + "gasPrice", simulatedBundle.BundleGasPrice, + "txcount", len(simulatedBundle.OriginalBundle.Txs)) + + includedTxs = append(includedTxs, bundle.OriginalBundle.Txs...) + + mergedBundle.BundleGasFees.Add(mergedBundle.BundleGasFees, simulatedBundle.BundleGasFees) + mergedBundle.BundleGasUsed += simulatedBundle.BundleGasUsed + + for _, tx := range includedTxs { + if !containsHash(bundle.OriginalBundle.RevertingTxHashes, tx.Hash()) { + env.UnRevertible.Add(tx.Hash()) + } + } + } + + if len(includedTxs) == 0 { + return nil, nil, errors.New("include no txs when merge bundles") + } + + mergedBundle.BundleGasPrice.Div(mergedBundle.BundleGasFees, new(big.Int).SetUint64(mergedBundle.BundleGasUsed)) + + return includedTxs, &mergedBundle, nil +} + +// simulateBundle computes the gas price for a whole simulateBundle based on the same ctx +// named computeBundleGas in flashbots +func (w *worker) simulateBundle( + env *environment, bundle *types.Bundle, state *state.StateDB, gasPool *core.GasPool, currentTxCount int, + prune, pruneGasExceed bool, +) (*types.SimulatedBundle, error) { + var ( + tempGasUsed uint64 + bundleGasUsed uint64 + bundleGasFees = new(big.Int) + ) + + for i, tx := range bundle.Txs { + state.SetTxContext(tx.Hash(), i+currentTxCount) + + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &w.coinbase, gasPool, state, env.header, tx, + &tempGasUsed, *w.chain.GetVMConfig()) + if err != nil { + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + if errors.Is(err, core.ErrGasLimitReached) && !pruneGasExceed { + log.Warn("bundle gas limit exceed", "hash", bundle.Hash().String()) + } else { + log.Warn("prune bundle", "hash", bundle.Hash().String(), "err", err) + w.eth.TxPool().PruneBundle(bundle.Hash()) + } + } + + return nil, err + } + + if receipt.Status == types.ReceiptStatusFailed && !containsHash(bundle.RevertingTxHashes, receipt.TxHash) { + err = errNonRevertingTxInBundleFailed + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + w.eth.TxPool().PruneBundle(bundle.Hash()) + log.Warn("prune bundle", "hash", bundle.Hash().String()) + } + + return nil, err + } + if !w.eth.TxPool().Has(tx.Hash()) { + bundleGasUsed += receipt.GasUsed + + txGasUsed := new(big.Int).SetUint64(receipt.GasUsed) + effectiveTip, er := tx.EffectiveGasTip(env.header.BaseFee) + if er != nil { + return nil, er + } + if env.header.BaseFee != nil { + effectiveTip.Add(effectiveTip, env.header.BaseFee) + } + txGasFees := new(big.Int).Mul(txGasUsed, effectiveTip) + bundleGasFees.Add(bundleGasFees, txGasFees) + } + } + // if all txs in the bundle are from txpool, we accept the bundle without checking gas price + bundleGasPrice := big.NewInt(0) + if bundleGasUsed != 0 { + bundleGasPrice = new(big.Int).Div(bundleGasFees, new(big.Int).SetUint64(bundleGasUsed)) + } + + if bundleGasPrice.Cmp(big.NewInt(w.config.Mev.MevBundleGasPriceFloor)) < 0 { + err := errBundlePriceTooLow + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + log.Warn("prune bundle", "hash", bundle.Hash().String()) + w.eth.TxPool().PruneBundle(bundle.Hash()) + } + + return nil, err + } + + return &types.SimulatedBundle{ + OriginalBundle: bundle, + BundleGasFees: bundleGasFees, + BundleGasPrice: bundleGasPrice, + BundleGasUsed: bundleGasUsed, + }, nil +} + +func (w *worker) simulateGaslessBundle(env *environment, bundle *types.Bundle) (*types.SimulateGaslessBundleResp, error) { + result := make([]types.GaslessTxSimResult, 0) + + txIdx := 0 + for _, tx := range bundle.Txs { + env.state.SetTxContext(tx.Hash(), txIdx) + + var ( + snap = env.state.Snapshot() + gp = env.gasPool.Gas() + ) + + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &w.coinbase, env.gasPool, env.state, env.header, tx, + &env.header.GasUsed, *w.chain.GetVMConfig()) + if err != nil { + env.state.RevertToSnapshot(snap) + env.gasPool.SetGas(gp) + log.Info("fail to simulate gasless bundle, skipped", "txHash", tx.Hash(), "err", err) + } else { + txIdx++ + result = append(result, types.GaslessTxSimResult{ + Hash: tx.Hash(), + GasUsed: receipt.GasUsed, + }) + } + } + + return &types.SimulateGaslessBundleResp{ + ValidResults: result, + BasedBlockNumber: env.header.Number.Int64(), + }, nil +} + +func containsHash(arr []common.Hash, match common.Hash) bool { + for _, elem := range arr { + if elem == match { + return true + } + } + return false +} + +func prepareGasPool() *core.GasPool { + gasPool := new(core.GasPool).AddGas(params.BundleGasLimit) + return gasPool +} diff --git a/params/config.go b/params/config.go index 150da88d72..9eaf2c7e9a 100644 --- a/params/config.go +++ b/params/config.go @@ -32,7 +32,7 @@ var ( OPBNBMainNetGenesisHash = common.HexToHash("0x4dd61178c8b0f01670c231597e7bcb368e84545acd46d940a896d6a791dd6df4") OPBNBTestNetGenesisHash = common.HexToHash("0x51fa57729dfb1c27542c21b06cb72a0459c57440ceb43a465dae1307cd04fe80") - OPBNBQANetGenesisHash = common.HexToHash("0xe182e685b1ec05ca55f2374cb3a190d1ae8f3e196acb55a69efd61536fc3983f") + OPBNBQANetGenesisHash = common.HexToHash("0xfd822cb9ed44eac6bb4c5413cc34b247d814ba4d162610f434a19d5ba68749fd") ) const ( @@ -236,10 +236,11 @@ var ( CancunTime: newUint64(1715754600), // May-15-2024 06:30 AM +UTC EcotoneTime: newUint64(1715754600), // May-15-2024 06:30 AM +UTC HaberTime: newUint64(1717048800), // May-30-2024 06:00 AM +UTC + WrightTime: newUint64(1723701600), // Aug-15-2024 06:00 AM +UTC } // OPBNBQANetConfig is the chain parameters to run a node on the opBNB qa network. It is just for internal test. OPBNBQANetConfig = &ChainConfig{ - ChainID: big.NewInt(2484), + ChainID: big.NewInt(7180), HomesteadBlock: big.NewInt(0), EIP150Block: big.NewInt(0), EIP155Block: big.NewInt(0), @@ -270,6 +271,7 @@ var ( CancunTime: newUint64(1714995000), // May-06-2024 11:30 AM +UTC EcotoneTime: newUint64(1714995000), // May-06-2024 11:30 AM +UTC HaberTime: newUint64(1716361200), // May-22-2024 07:00 AM +UTC + WrightTime: newUint64(1721815200), // July-24-2024 10:00 AM +UTC } // AllEthashProtocolChanges contains every protocol change (EIPs) introduced @@ -482,6 +484,7 @@ type ChainConfig struct { // Delta: the Delta upgrade does not affect the execution-layer, and is thus not configurable in the chain config. EcotoneTime *uint64 `json:"ecotoneTime,omitempty"` // Ecotone switch time (nil = no fork, 0 = already on optimism ecotone) HaberTime *uint64 `json:"haberTime,omitempty"` // Haber switch time (nil = no fork, 0 = already on haber) + WrightTime *uint64 `json:"wrightTime,omitempty"` // Wright switch time (nil = no fork, 0 = already on wright) InteropTime *uint64 `json:"interopTime,omitempty"` // Interop switch time (nil = no fork, 0 = already on optimism interop) @@ -652,6 +655,9 @@ func (c *ChainConfig) Description() string { if c.HaberTime != nil { banner += fmt.Sprintf(" - Haber: @%-10v\n", *c.HaberTime) } + if c.WrightTime != nil { + banner += fmt.Sprintf(" - Wright: @%-10v\n", *c.WrightTime) + } return banner } @@ -777,6 +783,10 @@ func (c *ChainConfig) IsHaber(time uint64) bool { return isTimestampForked(c.HaberTime, time) } +func (c *ChainConfig) IsWright(time uint64) bool { + return isTimestampForked(c.WrightTime, time) +} + func (c *ChainConfig) IsInterop(time uint64) bool { return isTimestampForked(c.InteropTime, time) } @@ -1140,6 +1150,7 @@ type Rules struct { IsOptimismCanyon bool IsFermat bool IsHaber bool + IsWright bool } // Rules ensures c's ChainID is not nil. @@ -1172,5 +1183,6 @@ func (c *ChainConfig) Rules(num *big.Int, isMerge bool, timestamp uint64) Rules // OPBNB IsFermat: c.IsFermat(num), IsHaber: c.IsHaber(timestamp), + IsWright: c.IsWright(timestamp), } } diff --git a/params/protocol_params.go b/params/protocol_params.go index 49805a4d16..b1dd4ea05c 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -34,6 +34,7 @@ const ( MinGasLimit uint64 = 5000 // Minimum the gas limit may ever be. MaxGasLimit uint64 = 0x7fffffffffffffff // Maximum the gas limit (2^63-1). GenesisGasLimit uint64 = 4712388 // Gas limit of the Genesis block. + BundleGasLimit uint64 = 10000000 // Gas limit of bundle. MaximumExtraDataSize uint64 = 32 // Maximum size extra data may be after Genesis. ExpByteGas uint64 = 10 // Times ceil(log256(exponent)) for the EXP instruction. @@ -134,6 +135,7 @@ const ( DefaultBaseFeeChangeDenominator = 8 // Bounds the amount the base fee can change between blocks. DefaultElasticityMultiplier = 2 // Bounds the maximum gas limit an EIP-1559 block may have. InitialBaseFee = 1000000000 // Initial base fee for EIP-1559 blocks. + OpBNBBaseFeeForGasLess = 0 // Initial base fee for opBNB gasless. MaxCodeSize = 24576 // Maximum bytecode to permit for a contract MaxInitCodeSize = 2 * MaxCodeSize // Maximum initcode to permit in a creation transaction and create instructions