diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1bd369d75216..a53ab6d87bad 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -566,6 +566,12 @@ var ( Value: ethconfig.Defaults.Miner.NewPayloadTimeout, Category: flags.MinerCategory, } + MinerTrustedRelaysFlag = &cli.StringFlag{ + Name: "miner.trustedrelays", + Usage: "flashbots - The Ethereum addresses of trusted relays for signature verification. The miner will accept signed bundles and other tasks from the relay, being reasonably certain about DDoS safety.", + Value: "0x870e2734DdBe2Fba9864f33f3420d59Bc641f2be", + Category: flags.MinerCategory, + } // Account settings UnlockedAccountFlag = &cli.StringFlag{ @@ -1663,6 +1669,15 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) { if ctx.IsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name) } + + addresses := strings.Split(ctx.String(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { @@ -1716,7 +1731,17 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name) } - cfg.MaxMergedBundles = ctx.Int(MinerMaxMergedBundles.Name) + cfg.MaxMergedBundles = ctx.Int(MinerMaxMergedBundlesFlag.Name) + + addresses := strings.Split(ctx.String(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } + log.Info("Trusted relays set as", "addresses", cfg.TrustedRelays) } func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 2d4a16f55ec9..ff0175b3ba0c 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -168,6 +168,8 @@ type Config struct { GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + + TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config. } // DefaultConfig contains the default configurations for the transaction @@ -614,6 +616,52 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m return nil } +// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already. +func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + fromTrustedRelay := false + for _, trustedAddr := range pool.config.TrustedRelays { + if relayAddr == trustedAddr { + fromTrustedRelay = true + } + } + if !fromTrustedRelay { + return errors.New("megabundle from non-trusted address") + } + + pool.megabundles[relayAddr] = types.MevBundle{ + Txs: txs, + BlockNumber: blockNumber, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + RevertingTxHashes: revertingTxHashes, + } + return nil +} + +// GetMegabundle returns the latest megabundle submitted by a given relay. +func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + megabundle, ok := pool.megabundles[relayAddr] + if !ok { + return types.MevBundle{}, errors.New("No megabundle found") + } + if megabundle.BlockNumber.Cmp(blockNumber) != 0 { + return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints") + } + if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints") + } + if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints") + } + return megabundle, nil +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/eth/api_backend.go b/eth/api_backend.go index 7dae1ec3587b..e0f54870dd57 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -269,6 +269,10 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) +} + func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions diff --git a/go.sum b/go.sum index 1b44de026d22..f4e4b3248f00 100644 --- a/go.sum +++ b/go.sum @@ -859,6 +859,7 @@ gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3M gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 4fe1ba1ca379..d3c3b2e0a4f7 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2110,7 +2110,7 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { return &PrivateTxBundleAPI{b} } -// SendBundleArgs represents the arguments for a call. +// SendBundleArgs represents the arguments for a SendBundle call. type SendBundleArgs struct { Txs []hexutil.Bytes `json:"txs"` BlockNumber rpc.BlockNumber `json:"blockNumber"` @@ -2119,6 +2119,25 @@ type SendBundleArgs struct { RevertingTxHashes []common.Hash `json:"revertingTxHashes"` } +// SendMegabundleArgs represents the arguments for a SendMegabundle call. +type SendMegabundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + BlockNumber uint64 `json:"blockNumber"` + MinTimestamp *uint64 `json:"minTimestamp"` + MaxTimestamp *uint64 `json:"maxTimestamp"` + RevertingTxHashes []common.Hash `json:"revertingTxHashes"` + RelaySignature hexutil.Bytes `json:"relaySignature"` +} + +// UnsignedMegabundle is used for serialization and subsequent digital signing. +type UnsignedMegabundle struct { + Txs []hexutil.Bytes + BlockNumber uint64 + MinTimestamp uint64 + MaxTimestamp uint64 + RevertingTxHashes []common.Hash +} + // SendBundle will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce and ensuring validity func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error { @@ -2148,3 +2167,58 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes) } + +// Recovers the Ethereum address of the trusted relay that signed the megabundle. +func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) { + megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes} + if args.MinTimestamp != nil { + megabundle.MinTimestamp = *args.MinTimestamp + } else { + megabundle.MinTimestamp = 0 + } + if args.MaxTimestamp != nil { + megabundle.MaxTimestamp = *args.MaxTimestamp + } else { + megabundle.MaxTimestamp = 0 + } + rlpEncoding, _ := rlp.EncodeToBytes(megabundle) + signature := args.RelaySignature + signature[64] -= 27 // account for Ethereum V + recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature) + if err != nil { + return common.Address{}, err + } + return crypto.PubkeyToAddress(*recoveredPubkey), nil +} + +// SendMegabundle will add the signed megabundle to one of the workers for evaluation. +func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error { + log.Info("Received a Megabundle request", "signature", args.RelaySignature) + var txs types.Transactions + if len(args.Txs) == 0 { + return errors.New("megabundle missing txs") + } + if args.BlockNumber == 0 { + return errors.New("megabundle missing blockNumber") + } + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return err + } + txs = append(txs, tx) + } + var minTimestamp, maxTimestamp uint64 + if args.MinTimestamp != nil { + minTimestamp = *args.MinTimestamp + } + if args.MaxTimestamp != nil { + maxTimestamp = *args.MaxTimestamp + } + relayAddr, err := RecoverRelayAddress(args) + log.Info("Megabundle", "relayAddr", relayAddr, "err", err) + if err != nil { + return err + } + return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 24a48658d9df..4ef1f18498a8 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -76,6 +76,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error + SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 65ac6f324a38..6b65bb1f362b 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -212,6 +212,10 @@ type backendMock struct { config *params.ChainConfig } +func (b *backendMock) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return nil +} + func (b *backendMock) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return nil } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 388eed8a2fa6..635b6470c7f4 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -611,6 +611,11 @@ web3._extend({ params: 3, inputFormatter: [web3._extend.formatters.inputCallFormatter, web3._extend.formatters.inputDefaultBlockNumberFormatter, null], }), + new web3._extend.Method({ + name: 'sendMegabundle', + call: 'eth_sendMegabundle', + params: 1 + }), ], properties: [ new web3._extend.Property({ diff --git a/les/api_backend.go b/les/api_backend.go index b574b1df6190..117a74029d7b 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -200,10 +200,15 @@ func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) func (b *LesApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.RemoveTx(txHash) } + func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return nil +} + func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { return b.eth.txPool.GetTransactions() } diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 9a39983c5a43..050ea38af4e5 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -98,13 +98,24 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons for i := 1; i <= config.MaxMergedBundles; i++ { workers = append(workers, newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - queue: queue, - maxMergedBundles: i, + isFlashbots: true, + isMegabundleWorker: false, + queue: queue, + maxMergedBundles: i, })) } - log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "worker", len(workers)) + for i := 0; i < len(config.TrustedRelays); i++ { + workers = append(workers, + newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + })) + } + + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, workers: workers, @@ -112,7 +123,9 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons } type flashbotsData struct { - isFlashbots bool - queue chan *task - maxMergedBundles int + isFlashbots bool + isMegabundleWorker bool + queue chan *task + maxMergedBundles int + relayAddr common.Address } diff --git a/miner/worker.go b/miner/worker.go index 5fb615bed344..0141dfee4749 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -164,9 +164,10 @@ type task struct { block *types.Block createdAt time.Time - profit *big.Int - isFlashbots bool - worker int + profit *big.Int + isFlashbots bool + worker int + isMegabundle bool } const ( @@ -773,7 +774,7 @@ func (w *worker) taskLoop() { // Interrupt previous sealing operation interrupt() stopCh, prev = make(chan struct{}), sealHash - log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker) + log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle) if w.skipSealHook != nil && w.skipSealHook(task) { continue } @@ -1286,7 +1287,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC return err } } - if w.flashbots.isFlashbots { + if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker { bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) @@ -1305,8 +1306,42 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC if err := w.commitBundle(env, bundleTxs, interrupt); err != nil { return err } - env.profit.Add(env.profit, bundle.totalEth) + env.profit.Add(env.profit, bundle.ethSentToCoinbase) } + if w.flashbots.isMegabundleWorker { + megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time) + log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err) + if err != nil { + return err // no valid megabundle for this relay, nothing to do + } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. + // Megabundles API focuses on speed and runs everything in one cycle. + coinbaseBalanceBefore := env.state.GetBalance(env.coinbase) + if err := w.commitBundle(env, megabundle.Txs, interrupt); err != nil { + log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "err", err) + return err + } + var txStatuses = map[common.Hash]bool{} + for _, receipt := range env.receipts { + txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful + } + for _, tx := range megabundle.Txs { + status, ok := txStatuses[tx.Hash()] + if !ok { + log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash()) + return errors.New("no tx receipt after megabundle simulation") + } + if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) { + log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash()) + return errors.New("megabundle contains failing tx") + } + } + coinbaseBalanceAfter := env.state.GetBalance(env.coinbase) + coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore) + env.profit = coinbaseDelta + log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit)) + } + if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) if err := w.commitTransactions(env, txs, interrupt); err != nil { @@ -1466,7 +1501,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select { - case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}: + case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}: w.unconfirmed.Shift(block.NumberU64() - 1) fees := totalFees(block, env.receipts)