Skip to content

Commit

Permalink
Flashbots changes v0.3 to v0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
bogatyy authored and avalonche committed Mar 8, 2023
1 parent 91f1898 commit 6fff7b4
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 16 deletions.
27 changes: 26 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,12 @@ var (
Value: ethconfig.Defaults.Miner.NewPayloadTimeout,
Category: flags.MinerCategory,
}
MinerTrustedRelaysFlag = &cli.StringFlag{
Name: "miner.trustedrelays",
Usage: "flashbots - The Ethereum addresses of trusted relays for signature verification. The miner will accept signed bundles and other tasks from the relay, being reasonably certain about DDoS safety.",
Value: "0x870e2734DdBe2Fba9864f33f3420d59Bc641f2be",
Category: flags.MinerCategory,
}

// Account settings
UnlockedAccountFlag = &cli.StringFlag{
Expand Down Expand Up @@ -1663,6 +1669,15 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}

addresses := strings.Split(ctx.String(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) {
Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed)
} else {
cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed))
}
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down Expand Up @@ -1716,7 +1731,17 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name)
}

cfg.MaxMergedBundles = ctx.Int(MinerMaxMergedBundles.Name)
cfg.MaxMergedBundles = ctx.Int(MinerMaxMergedBundlesFlag.Name)

addresses := strings.Split(ctx.String(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) {
Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed)
} else {
cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed))
}
}
log.Info("Trusted relays set as", "addresses", cfg.TrustedRelays)
}

func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
48 changes: 48 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ type Config struct {
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config.
}

// DefaultConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -614,6 +616,52 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
return nil
}

// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already.
func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
pool.mu.Lock()
defer pool.mu.Unlock()

fromTrustedRelay := false
for _, trustedAddr := range pool.config.TrustedRelays {
if relayAddr == trustedAddr {
fromTrustedRelay = true
}
}
if !fromTrustedRelay {
return errors.New("megabundle from non-trusted address")
}

pool.megabundles[relayAddr] = types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
}
return nil
}

// GetMegabundle returns the latest megabundle submitted by a given relay.
func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) {
pool.mu.Lock()
defer pool.mu.Unlock()

megabundle, ok := pool.megabundles[relayAddr]
if !ok {
return types.MevBundle{}, errors.New("No megabundle found")
}
if megabundle.BlockNumber.Cmp(blockNumber) != 0 {
return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints")
}
if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints")
}
if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints")
}
return megabundle, nil
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions,
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
var txs types.Transactions
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3M
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0=
gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
76 changes: 75 additions & 1 deletion internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,7 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI {
return &PrivateTxBundleAPI{b}
}

// SendBundleArgs represents the arguments for a call.
// SendBundleArgs represents the arguments for a SendBundle call.
type SendBundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
BlockNumber rpc.BlockNumber `json:"blockNumber"`
Expand All @@ -2119,6 +2119,25 @@ type SendBundleArgs struct {
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
}

// SendMegabundleArgs represents the arguments for a SendMegabundle call.
type SendMegabundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
BlockNumber uint64 `json:"blockNumber"`
MinTimestamp *uint64 `json:"minTimestamp"`
MaxTimestamp *uint64 `json:"maxTimestamp"`
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
RelaySignature hexutil.Bytes `json:"relaySignature"`
}

// UnsignedMegabundle is used for serialization and subsequent digital signing.
type UnsignedMegabundle struct {
Txs []hexutil.Bytes
BlockNumber uint64
MinTimestamp uint64
MaxTimestamp uint64
RevertingTxHashes []common.Hash
}

// SendBundle will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error {
Expand Down Expand Up @@ -2148,3 +2167,58 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs

return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes)
}

// Recovers the Ethereum address of the trusted relay that signed the megabundle.
func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) {
megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes}
if args.MinTimestamp != nil {
megabundle.MinTimestamp = *args.MinTimestamp
} else {
megabundle.MinTimestamp = 0
}
if args.MaxTimestamp != nil {
megabundle.MaxTimestamp = *args.MaxTimestamp
} else {
megabundle.MaxTimestamp = 0
}
rlpEncoding, _ := rlp.EncodeToBytes(megabundle)
signature := args.RelaySignature
signature[64] -= 27 // account for Ethereum V
recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature)
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*recoveredPubkey), nil
}

// SendMegabundle will add the signed megabundle to one of the workers for evaluation.
func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error {
log.Info("Received a Megabundle request", "signature", args.RelaySignature)
var txs types.Transactions
if len(args.Txs) == 0 {
return errors.New("megabundle missing txs")
}
if args.BlockNumber == 0 {
return errors.New("megabundle missing blockNumber")
}
for _, encodedTx := range args.Txs {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(encodedTx); err != nil {
return err
}
txs = append(txs, tx)
}
var minTimestamp, maxTimestamp uint64
if args.MinTimestamp != nil {
minTimestamp = *args.MinTimestamp
}
if args.MaxTimestamp != nil {
maxTimestamp = *args.MaxTimestamp
}
relayAddr, err := RecoverRelayAddress(args)
log.Info("Megabundle", "relayAddr", relayAddr, "err", err)
if err != nil {
return err
}
return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr)
}
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Backend interface {
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error
SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
Expand Down
4 changes: 4 additions & 0 deletions internal/ethapi/transaction_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ type backendMock struct {
config *params.ChainConfig
}

func (b *backendMock) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return nil
}

func (b *backendMock) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ web3._extend({
params: 3,
inputFormatter: [web3._extend.formatters.inputCallFormatter, web3._extend.formatters.inputDefaultBlockNumberFormatter, null],
}),
new web3._extend.Method({
name: 'sendMegabundle',
call: 'eth_sendMegabundle',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
5 changes: 5 additions & 0 deletions les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,15 @@ func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.RemoveTx(txHash)
}

func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return nil
}

func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
return b.eth.txPool.GetTransactions()
}
Expand Down
27 changes: 20 additions & 7 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,34 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
for i := 1; i <= config.MaxMergedBundles; i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: true,
queue: queue,
maxMergedBundles: i,
isFlashbots: true,
isMegabundleWorker: false,
queue: queue,
maxMergedBundles: i,
}))
}

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "worker", len(workers))
for i := 0; i < len(config.TrustedRelays); i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
}))
}

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
return &multiWorker{
regularWorker: regularWorker,
workers: workers,
}
}

type flashbotsData struct {
isFlashbots bool
queue chan *task
maxMergedBundles int
isFlashbots bool
isMegabundleWorker bool
queue chan *task
maxMergedBundles int
relayAddr common.Address
}
49 changes: 42 additions & 7 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ type task struct {
block *types.Block
createdAt time.Time

profit *big.Int
isFlashbots bool
worker int
profit *big.Int
isFlashbots bool
worker int
isMegabundle bool
}

const (
Expand Down Expand Up @@ -773,7 +774,7 @@ func (w *worker) taskLoop() {
// Interrupt previous sealing operation
interrupt()
stopCh, prev = make(chan struct{}), sealHash
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker)
log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle)
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
Expand Down Expand Up @@ -1286,7 +1287,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
return err
}
}
if w.flashbots.isFlashbots {
if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker {
bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
Expand All @@ -1305,8 +1306,42 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC
if err := w.commitBundle(env, bundleTxs, interrupt); err != nil {
return err
}
env.profit.Add(env.profit, bundle.totalEth)
env.profit.Add(env.profit, bundle.ethSentToCoinbase)
}
if w.flashbots.isMegabundleWorker {
megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time)
log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err)
if err != nil {
return err // no valid megabundle for this relay, nothing to do
}
// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
// Megabundles API focuses on speed and runs everything in one cycle.
coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)
if err := w.commitBundle(env, megabundle.Txs, interrupt); err != nil {
log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "err", err)
return err
}
var txStatuses = map[common.Hash]bool{}
for _, receipt := range env.receipts {
txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful
}
for _, tx := range megabundle.Txs {
status, ok := txStatuses[tx.Hash()]
if !ok {
log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash())
return errors.New("no tx receipt after megabundle simulation")
}
if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) {
log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash())
return errors.New("megabundle contains failing tx")
}
}
coinbaseBalanceAfter := env.state.GetBalance(env.coinbase)
coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore)
env.profit = coinbaseDelta
log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit))
}

if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt); err != nil {
Expand Down Expand Up @@ -1466,7 +1501,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
// If we're post merge, just ignore
if !w.isTTDReached(block.Header()) {
select {
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}:
case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}:
w.unconfirmed.Shift(block.NumberU64() - 1)

fees := totalFees(block, env.receipts)
Expand Down

0 comments on commit 6fff7b4

Please sign in to comment.