Skip to content

Commit

Permalink
core/types: support for optional blob sidecar in BlobTx (#27841)
Browse files Browse the repository at this point in the history
This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way
of keeping the blob sidecar within types.Transaction. It's better this way because most
code in go-ethereum does not care about blob transactions, and probably never will. This
will start mattering especially on the client side of RPC, where all APIs are based on
types.Transaction. Users need to be able to use the same signing flows they already
have.

However, since blobs are only allowed in some places but not others, we will now need to
add checks to avoid creating invalid blocks. I'm still trying to figure out the best place
to do some of these. The way I have it currently is as follows:

- In block validation (import), txs are verified not to have a blob sidecar.
- In miner, we strip off the sidecar when committing the transaction into the block.
- In TxPool validation, txs must have a sidecar to be added into the blobpool.
  - Note there is a special case here: when transactions are re-added because of a chain
    reorg, we cannot use the transactions gathered from the old chain blocks as-is,
    because they will be missing their blobs. This was previously handled by storing the
    blobs into the 'blobpool limbo'. The code has now changed to store the full
    transaction in the limbo instead, but it might be confusing for code readers why we're
    not simply adding the types.Transaction we already have.

Code changes summary:

- txpool.Transaction removed and all uses replaced by types.Transaction again
- blobpool now stores types.Transaction instead of defining its own blobTx format for storage
- the blobpool limbo now stores types.Transaction instead of storing only the blobs
- checks to validate the presence/absence of the blob sidecar added in certain critical places
  • Loading branch information
fjl authored Aug 14, 2023
1 parent 6886006 commit 2a6beb6
Show file tree
Hide file tree
Showing 32 changed files with 626 additions and 373 deletions.
16 changes: 14 additions & 2 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}

// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
Expand All @@ -81,14 +82,23 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// Withdrawals are not allowed prior to Shanghai fork
return errors.New("withdrawals present in block body")
}

// Blob transactions may be present after the Cancun fork.
var blobs int
for _, tx := range block.Transactions() {
for i, tx := range block.Transactions() {
// Count the number of blobs to validate against the header's blobGasUsed
blobs += len(tx.BlobHashes())

// If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block.
if tx.BlobTxSidecar() != nil {
return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i)
}

// The individual checks for blob validity (version-check + not empty)
// happens in the state_transition check.
// happens in StateTransition.
}

// Check blob gas usage.
if header.BlobGasUsed != nil {
if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated
return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob)
Expand All @@ -98,6 +108,8 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
return errors.New("data blobs present in block body")
}
}

// Ancestor block must be known.
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
Expand Down
29 changes: 20 additions & 9 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,19 +1085,30 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
ancientReceipts, liveReceipts []types.Receipts
)
// Do a sanity check that the provided chain is actually ordered and linked
for i := 0; i < len(blockChain); i++ {
for i, block := range blockChain {
if i != 0 {
if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
"prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, blockChain[i-1].NumberU64(),
blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
prev := blockChain[i-1]
if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
log.Error("Non contiguous receipt insert",
"number", block.Number(), "hash", block.Hash(), "parent", block.ParentHash(),
"prevnumber", prev.Number(), "prevhash", prev.Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])",
i-1, prev.NumberU64(), prev.Hash().Bytes()[:4],
i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
}
}
if blockChain[i].NumberU64() <= ancientLimit {
ancientBlocks, ancientReceipts = append(ancientBlocks, blockChain[i]), append(ancientReceipts, receiptChain[i])
if block.NumberU64() <= ancientLimit {
ancientBlocks, ancientReceipts = append(ancientBlocks, block), append(ancientReceipts, receiptChain[i])
} else {
liveBlocks, liveReceipts = append(liveBlocks, blockChain[i]), append(liveReceipts, receiptChain[i])
liveBlocks, liveReceipts = append(liveBlocks, block), append(liveReceipts, receiptChain[i])
}

// Here we also validate that blob transactions in the block do not contain a sidecar.
// While the sidecar does not affect the block hash / tx hash, sending blobs within a block is not allowed.
for txIndex, tx := range block.Transactions() {
if tx.Type() == types.BlobTxType && tx.BlobTxSidecar() != nil {
return 0, fmt.Errorf("block #%d contains unexpected blob sidecar in tx at index %d", block.NumberU64(), txIndex)
}
}
}

Expand Down
75 changes: 34 additions & 41 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package blobpool

import (
"container/heap"
"errors"
"fmt"
"math"
"math/big"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -83,16 +83,6 @@ const (
limboedTransactionStore = "limbo"
)

// blobTx is a wrapper around types.BlobTx which also contains the literal blob
// data along with all the transaction metadata.
type blobTx struct {
Tx *types.Transaction

Blobs []kzg4844.Blob
Commits []kzg4844.Commitment
Proofs []kzg4844.Proof
}

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
// schedule the blob transactions into the following blocks. Only ever add the
// bare minimum needed fields to keep the size down (and thus number of entries
Expand Down Expand Up @@ -455,22 +445,27 @@ func (p *BlobPool) Close() error {
// parseTransaction is a callback method on pool creation that gets called for
// each transaction on disk to create the in-memory metadata index.
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
item := new(blobTx)
if err := rlp.DecodeBytes(blob, item); err != nil {
tx := new(types.Transaction)
if err := rlp.DecodeBytes(blob, tx); err != nil {
// This path is impossible unless the disk data representation changes
// across restarts. For that ever unprobable case, recover gracefully
// by ignoring this data entry.
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
return err
}
meta := newBlobTxMeta(id, size, item.Tx)
if tx.BlobTxSidecar() == nil {
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash())
return errors.New("missing blob sidecar")
}

meta := newBlobTxMeta(id, size, tx)

sender, err := p.signer.Sender(item.Tx)
sender, err := p.signer.Sender(tx)
if err != nil {
// This path is impossible unless the signature validity changes across
// restarts. For that ever unprobable case, recover gracefully by ignoring
// this data entry.
log.Error("Failed to recover blob tx sender", "id", id, "hash", item.Tx.Hash(), "err", err)
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
return err
}
if _, ok := p.index[sender]; !ok {
Expand Down Expand Up @@ -718,17 +713,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
item := new(blobTx)
if err = rlp.DecodeBytes(data, item); err != nil {
var tx types.Transaction
if err = rlp.DecodeBytes(data, tx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
block, ok := inclusions[item.Tx.Hash()]
block, ok := inclusions[tx.Hash()]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
return
}
if err := p.limbo.push(item.Tx.Hash(), block, item.Blobs, item.Commits, item.Proofs); err != nil {
if err := p.limbo.push(&tx, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
}
Expand Down Expand Up @@ -760,7 +755,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _, tx := range txs {
p.reinject(addr, tx)
p.reinject(addr, tx.Hash())
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
Expand Down Expand Up @@ -920,16 +915,19 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
blobs, commits, proofs, err := p.limbo.pull(tx.Hash())
tx, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return
}
// Serialize the transaction back into the primary datastore
blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs})
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.

// Serialize the transaction back into the primary datastore.
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return
Expand All @@ -939,9 +937,9 @@ func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return
}

// Update the indixes and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)

if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
Expand Down Expand Up @@ -1023,7 +1021,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
func (p *BlobPool) validateTx(tx *types.Transaction) error {
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
// consensus rules
baseOpts := &txpool.ValidationOptions{
Expand All @@ -1032,7 +1030,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commi
MaxSize: txMaxSize,
MinTip: p.gasTip.ToBig(),
}
if err := txpool.ValidateTransaction(tx, blobs, commits, proofs, p.head, p.signer, baseOpts); err != nil {
if err := txpool.ValidateTransaction(tx, p.head, p.signer, baseOpts); err != nil {
return err
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
Expand Down Expand Up @@ -1117,7 +1115,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time.Now()
Expand All @@ -1139,32 +1137,27 @@ func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
return nil
}
item := new(blobTx)
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err)
return nil
}
return &txpool.Transaction{
Tx: item.Tx,
BlobTxBlobs: item.Blobs,
BlobTxCommits: item.Commits,
BlobTxProofs: item.Proofs,
}
return item
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
errs[i] = p.add(tx.Tx, tx.BlobTxBlobs, tx.BlobTxCommits, tx.BlobTxProofs)
errs[i] = p.add(tx)
}
return errs
}

// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) (err error) {
func (p *BlobPool) add(tx *types.Transaction) (err error) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
Expand All @@ -1178,7 +1171,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
}(time.Now())

// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx, blobs, commits, proofs); err != nil {
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
return err
}
Expand All @@ -1203,7 +1196,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
}
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs})
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
Expand Down
Loading

0 comments on commit 2a6beb6

Please sign in to comment.