Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: avoid block body retrieval when no matching logs #25199

Merged
merged 7 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,13 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if body := fb.bc.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}
Expand Down
15 changes: 3 additions & 12 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,9 @@ func deriveLogFields(receipts []*receiptLogs, hash common.Hash, number uint64, t
return nil
}

// ReadLogs retrieves the logs for all transactions in a block. The log fields
// are populated with metadata. In case the receipts or the block body
// are not found, a nil is returned.
// ReadLogs retrieves the logs for all transactions in a block. In case
// receipts is not found, a nil is returned.
// Note: ReadLogs does not derive unstored log fields.
func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.ChainConfig) [][]*types.Log {
// Retrieve the flattened receipt slice
data := ReadReceiptsRLP(db, hash, number)
Expand All @@ -729,15 +729,6 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64, config *params.C
return nil
}

body := ReadBody(db, hash, number)
if body == nil {
log.Error("Missing body but have receipt", "hash", hash, "number", number)
return nil
}
if err := deriveLogFields(receipts, hash, number, body.Transactions); err != nil {
log.Error("Failed to derive block receipts fields", "hash", hash, "number", number, "err", err)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review note: need to check for other uses of ReadLogs, which could be affected by not deriving the block context fields anymore.

logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
Expand Down
4 changes: 0 additions & 4 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,6 @@ func TestReadLogs(t *testing.T) {
t.Fatalf("unexpected number of logs[1] returned, have %d want %d", have, want)
}

// Fill in log fields so we can compare their rlp encoding
if err := types.Receipts(receipts).DeriveFields(params.TestChainConfig, hash, 0, body.Transactions); err != nil {
t.Fatal(err)
}
for i, pr := range receipts {
for j, pl := range pr.Logs {
rlpHave, err := rlp.EncodeToBytes(newFullLogRLP(logs[i][j]))
Expand Down
11 changes: 11 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*typ
return b.eth.blockchain.GetBlockByHash(hash), nil
}

// GetBody returns body of a block. It does not resolve special block numbers.
func (b *EthAPIBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if number < 0 || hash == (common.Hash{}) {
return nil, errors.New("invalid arguments; expect hash and no special block numbers")
}
if body := b.eth.blockchain.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (b *EthAPIBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) {
if blockNr, ok := blockNrOrHash.Number(); ok {
return b.BlockByNumber(ctx, blockNr)
Expand Down
68 changes: 30 additions & 38 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
if header == nil {
return nil, errors.New("unknown block")
}
return f.blockLogs(ctx, header, false)
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, true)
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -238,7 +238,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
if header == nil || err != nil {
return logs, err
}
found, err := f.blockLogs(ctx, header, false)
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
}
Expand All @@ -248,46 +248,46 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
// Fast track: no filtering criteria
if len(f.addresses) == 0 && len(f.topics) == 0 {
list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil, err
}
return flatten(list), nil
} else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) ([]*types.Log, error) {
if bloomFilter(header.Bloom, f.addresses, f.topics) {
return f.checkMatches(ctx, header)
}
return nil, nil
}

// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
// skipFilter signals all logs of the given block are requested.
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
hash := header.Hash()
// Logs in cache are partially filled with context data
// such as tx index, block hash, etc.
// Notably tx hash is NOT filled in because it needs
// access to block body data.
cached, err := f.sys.cachedLogElem(ctx, hash, header.Number.Uint64())
if err != nil {
return nil, err
}

unfiltered := flatten(logsList)
logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
// We have matching logs, check if we need to resolve full logs via the light client
if logs[0].TxHash == (common.Hash{}) {
receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
}
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
logs := filterLogs(cached.logs, nil, nil, f.addresses, f.topics)
if len(logs) == 0 {
return nil, nil
}
// Most backends will deliver un-derived logs, but check nevertheless.
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs, nil
}
return nil, nil

body, err := f.sys.cachedGetBody(ctx, cached, hash, header.Number.Uint64())
if err != nil {
return nil, err
}
for i, log := range logs {
// Copy log not to modify cache elements
logcopy := *log
logcopy.TxHash = body.Transactions[logcopy.TxIndex].Hash()
logs[i] = &logcopy
}
return logs, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
Expand Down Expand Up @@ -377,11 +377,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
}
return true
}

func flatten(list [][]*types.Log) []*types.Log {
var flat []*types.Log
for _, logs := range list {
flat = append(flat, logs...)
}
return flat
}
117 changes: 77 additions & 40 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Backend interface {
ChainDb() ethdb.Database
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
Expand All @@ -77,7 +79,7 @@ type Backend interface {
// FilterSystem holds resources shared by all filters.
type FilterSystem struct {
backend Backend
logsCache *lru.Cache[common.Hash, [][]*types.Log]
logsCache *lru.Cache[common.Hash, *logCacheElem]
cfg *Config
}

Expand All @@ -86,13 +88,18 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem {
config = config.withDefaults()
return &FilterSystem{
backend: backend,
logsCache: lru.NewCache[common.Hash, [][]*types.Log](config.LogCacheSize),
logsCache: lru.NewCache[common.Hash, *logCacheElem](config.LogCacheSize),
cfg: &config,
}
}

// cachedGetLogs loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
type logCacheElem struct {
logs []*types.Log
body atomic.Pointer[types.Body]
}

// cachedLogElem loads block logs from the backend and caches the result.
func (sys *FilterSystem) cachedLogElem(ctx context.Context, blockHash common.Hash, number uint64) (*logCacheElem, error) {
cached, ok := sys.logsCache.Get(blockHash)
if ok {
return cached, nil
Expand All @@ -105,8 +112,35 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
}
sys.logsCache.Add(blockHash, logs)
return logs, nil
// Database logs are un-derived.
// Fill in whatever we can (txHash is inaccessible at this point).
flattened := make([]*types.Log, 0)
var logIdx uint
for i, txLogs := range logs {
for _, log := range txLogs {
log.BlockHash = blockHash
log.BlockNumber = number
log.TxIndex = uint(i)
log.Index = logIdx
logIdx++
flattened = append(flattened, log)
}
}
elem := &logCacheElem{logs: flattened}
sys.logsCache.Add(blockHash, elem)
return elem, nil
}

func (sys *FilterSystem) cachedGetBody(ctx context.Context, elem *logCacheElem, hash common.Hash, number uint64) (*types.Body, error) {
if body := elem.body.Load(); body != nil {
return body, nil
}
body, err := sys.backend.GetBody(ctx, hash, rpc.BlockNumber(number))
if err != nil {
return nil, err
}
elem.body.Store(body)
return body, nil
}

// Type determines the kind of filter and is used to put the filter in to
Expand Down Expand Up @@ -431,6 +465,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 {
continue
}
if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 {
continue
}
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
Expand Down Expand Up @@ -474,42 +514,39 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
var unfiltered []*types.Log
for _, logs := range logsList {
for _, log := range logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
// We have matching but non-derived logs
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
for _, log := range receipt.Logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs = filterLogs(unfiltered, nil, nil, addresses, topics)
}
if !bloomFilter(header.Bloom, addresses, topics) {
return nil
}
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
unfiltered := append([]*types.Log{}, cached.logs...)
for i, log := range unfiltered {
// Don't modify in-cache elements
logcopy := *log
Copy link
Contributor

@fjl fjl Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copy is wasteful because we might have to copy the logs slice again in filterLogs.

It would be good to clearly define/document that filterLogs copies its input slice (and elements). Then you can just assign the Removed status after filtering without making a copy, in the loop where you assign the TxHash.

logcopy.Removed = remove
// Swap copy in-place
unfiltered[i] = &logcopy
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
// Txhash is already resolved
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs
}
return nil
// Resolve txhash
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
for _, log := range logs {
// logs are already copied, safe to modify
log.TxHash = body.Transactions[log.TxIndex].Hash()
}
return logs
}

// eventLoop (un)installs filters and processes mux events.
Expand Down
Loading