diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 3833316be4461..08a7260d264f1 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -555,6 +555,13 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil } +func (fb *filterBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) { + if body := fb.bc.GetBody(hash); body != nil { + return body, nil + } + return nil, errors.New("block body not found") +} + func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { return fb.backend.pendingBlock, fb.backend.pendingReceipts } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 04b30b2a2206a..bb79f9b71a42d 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -29,6 +29,22 @@ import ( "github.com/XinFinOrg/XDPoSChain/rlp" ) +// WriteCanonicalHash stores the hash assigned to a canonical block number. +func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil { + log.Crit("Failed to store number to hash mapping", "err", err) + } +} + +// WriteHeaderNumber stores the hash->number mapping. +func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + key := headerNumberKey(hash) + enc := encodeBlockNumber(number) + if err := db.Put(key, enc); err != nil { + log.Crit("Failed to store hash to number mapping", "err", err) + } +} + // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) @@ -39,6 +55,13 @@ func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { return &number } +// WriteHeadBlockHash stores the head block's hash. +func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Put(headBlockKey, hash.Bytes()); err != nil { + log.Crit("Failed to store last block's hash", "err", err) + } +} + // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash @@ -100,6 +123,27 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t WriteBodyRLP(db, hash, number, data) } +// WriteHeader stores a block header into the database and also stores the hash- +// to-number mapping. +func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) { + var ( + hash = header.Hash() + number = header.Number.Uint64() + ) + // Write the hash -> number mapping + WriteHeaderNumber(db, hash, number) + + // Write the encoded header + data, err := rlp.EncodeToBytes(header) + if err != nil { + log.Crit("Failed to RLP encode header", "err", err) + } + key := headerKey(number, hash) + if err := db.Put(key, data); err != nil { + log.Crit("Failed to store header", "err", err) + } +} + // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash @@ -195,6 +239,12 @@ func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rec } } +// WriteBlock serializes a block into the database, header and body separately. +func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { + WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) + WriteHeader(db, block.Header()) +} + // storedReceiptRLP is the storage encoding of a receipt. // Re-definition in core/types/receipt.go. type storedReceiptRLP struct { @@ -248,9 +298,9 @@ func deriveLogFields(receipts []*receiptLogs, hash common.Hash, number uint64, t return nil } -// ReadLogs retrieves the logs for all transactions in a block. The log fields -// are populated with metadata. In case the receipts or the block body -// are not found, a nil is returned. +// ReadLogs retrieves the logs for all transactions in a block. In case +// receipts is not found, a nil is returned. +// Note: ReadLogs does not derive unstored log fields. func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64) [][]*types.Log { // Retrieve the flattened receipt slice data := ReadReceiptsRLP(db, hash, number) @@ -263,15 +313,6 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64) [][]*types.Log { return nil } - body := ReadBody(db, hash, number) - if body == nil { - log.Error("Missing body but have receipt", "hash", hash, "number", number) - return nil - } - if err := deriveLogFields(receipts, hash, number, body.Transactions); err != nil { - log.Error("Failed to derive block receipts fields", "hash", hash, "number", number, "err", err) - return nil - } logs := make([][]*types.Log, len(receipts)) for i, receipt := range receipts { logs[i] = receipt.Logs diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 1b6527dbd2e18..a514c5857fdca 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -116,10 +116,6 @@ func TestReadLogs(t *testing.T) { t.Fatalf("unexpected number of logs[1] returned, have %d want %d", have, want) } - // Fill in log fields so we can compare their rlp encoding - if err := types.Receipts(receipts).DeriveFields(params.TestChainConfig, hash, 0, body.Transactions); err != nil { - t.Fatal(err) - } for i, pr := range receipts { for j, pl := range pr.Logs { rlpHave, err := rlp.EncodeToBytes(newFullLogRLP(logs[i][j])) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index d9d567e578285..0a08eeed69ed7 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -25,10 +25,13 @@ import ( // The fields below define the low level database schema prefixing. var ( + headBlockKey = []byte("LastBlock") // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). + headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header + headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian) - blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body + blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts ) @@ -50,6 +53,16 @@ func encodeBlockNumber(number uint64) []byte { return enc } +// headerKey = headerPrefix + num (uint64 big endian) + hash +func headerKey(number uint64, hash common.Hash) []byte { + return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix +func headerHashKey(number uint64) []byte { + return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...) +} + // headerNumberKey = headerNumberPrefix + hash func headerNumberKey(hash common.Hash) []byte { return append(headerNumberPrefix, hash.Bytes()...) diff --git a/eth/api_backend.go b/eth/api_backend.go index b2aaf2fbe8b08..5b18a35002310 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -160,6 +160,17 @@ func (b *EthApiBackend) BlockByHash(ctx context.Context, hash common.Hash) (*typ return b.eth.blockchain.GetBlockByHash(hash), nil } +// GetBody returns body of a block. It does not resolve special block numbers. +func (b *EthApiBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) { + if number < 0 || hash == (common.Hash{}) { + return nil, errors.New("invalid arguments; expect hash and no special block numbers") + } + if body := b.eth.blockchain.GetBody(hash); body != nil { + return body, nil + } + return nil, errors.New("block body not found") +} + func (b *EthApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) { if blockNr, ok := blockNrOrHash.Number(); ok { return b.BlockByNumber(ctx, blockNr) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 1fa21bb7e0d95..3defe854f86f8 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -104,7 +104,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { if header == nil { return nil, errors.New("unknown block") } - return f.blockLogs(ctx, header, false) + return f.blockLogs(ctx, header) } // Short-cut if all we care about is pending logs if f.begin == rpc.PendingBlockNumber.Int64() { @@ -192,7 +192,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header, true) + found, err := f.blockLogs(ctx, header) if err != nil { return logs, err } @@ -214,7 +214,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header, false) + found, err := f.blockLogs(ctx, header) if err != nil { return logs, err } @@ -224,15 +224,8 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e } // blockLogs returns the logs matching the filter criteria within a single block. -func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) { - // Fast track: no filtering criteria - if len(f.addresses) == 0 && len(f.topics) == 0 { - list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil, err - } - return flatten(list), nil - } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) { +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) ([]*types.Log, error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { return f.checkMatches(ctx, header) } return nil, nil @@ -240,30 +233,37 @@ func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. +// skipFilter signals all logs of the given block are requested. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) { - logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) + hash := header.Hash() + // Logs in cache are partially filled with context data + // such as tx index, block hash, etc. + // Notably tx hash is NOT filled in because it needs + // access to block body data. + cached, err := f.sys.cachedLogElem(ctx, hash, header.Number.Uint64()) if err != nil { return nil, err } - - unfiltered := flatten(logsList) - logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - if len(logs) > 0 { - // We have matching logs, check if we need to resolve full logs via the light client - if logs[0].TxHash == (common.Hash{}) { - receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash()) - if err != nil { - return nil, err - } - unfiltered = unfiltered[:0] - for _, receipt := range receipts { - unfiltered = append(unfiltered, receipt.Logs...) - } - logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - } + logs := filterLogs(cached.logs, nil, nil, f.addresses, f.topics) + if len(logs) == 0 { + return nil, nil + } + // Most backends will deliver un-derived logs, but check nevertheless. + if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { return logs, nil } - return nil, nil + + body, err := f.sys.cachedGetBody(ctx, cached, hash, header.Number.Uint64()) + if err != nil { + return nil, err + } + for i, log := range logs { + // Copy log not to modify cache elements + logcopy := *log + logcopy.TxHash = body.Transactions[logcopy.TxIndex].Hash() + logs[i] = &logcopy + } + return logs, nil } // pendingLogs returns the logs matching the filter criteria within the pending block. @@ -353,11 +353,3 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } - -func flatten(list [][]*types.Log) []*types.Log { - var flat []*types.Log - for _, logs := range list { - flat = append(flat, logs...) - } - return flat -} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 9de01e4a4f892..ad592154efa92 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" ethereum "github.com/XinFinOrg/XDPoSChain" @@ -57,6 +58,7 @@ type Backend interface { ChainDb() ethdb.Database HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) + GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) PendingBlockAndReceipts() (*types.Block, types.Receipts) @@ -74,7 +76,7 @@ type Backend interface { // FilterSystem holds resources shared by all filters. type FilterSystem struct { backend Backend - logsCache *lru.Cache[common.Hash, [][]*types.Log] + logsCache *lru.Cache[common.Hash, *logCacheElem] cfg *Config } @@ -83,13 +85,18 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem { config = config.withDefaults() return &FilterSystem{ backend: backend, - logsCache: lru.NewCache[common.Hash, [][]*types.Log](config.LogCacheSize), + logsCache: lru.NewCache[common.Hash, *logCacheElem](config.LogCacheSize), cfg: &config, } } -// cachedGetLogs loads block logs from the backend and caches the result. -func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { +type logCacheElem struct { + logs []*types.Log + body atomic.Pointer[types.Body] +} + +// cachedLogElem loads block logs from the backend and caches the result. +func (sys *FilterSystem) cachedLogElem(ctx context.Context, blockHash common.Hash, number uint64) (*logCacheElem, error) { cached, ok := sys.logsCache.Get(blockHash) if ok { return cached, nil @@ -102,8 +109,35 @@ func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Has if logs == nil { return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) } - sys.logsCache.Add(blockHash, logs) - return logs, nil + // Database logs are un-derived. + // Fill in whatever we can (txHash is inaccessible at this point). + flattened := make([]*types.Log, 0) + var logIdx uint + for i, txLogs := range logs { + for _, log := range txLogs { + log.BlockHash = blockHash + log.BlockNumber = number + log.TxIndex = uint(i) + log.Index = logIdx + logIdx++ + flattened = append(flattened, log) + } + } + elem := &logCacheElem{logs: flattened} + sys.logsCache.Add(blockHash, elem) + return elem, nil +} + +func (sys *FilterSystem) cachedGetBody(ctx context.Context, elem *logCacheElem, hash common.Hash, number uint64) (*types.Body, error) { + if body := elem.body.Load(); body != nil { + return body, nil + } + body, err := sys.backend.GetBody(ctx, hash, rpc.BlockNumber(number)) + if err != nil { + return nil, err + } + elem.body.Store(body) + return body, nil } // Type determines the kind of filter and is used to put the filter in to @@ -433,6 +467,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { + if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 { + continue + } + if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 { + continue + } if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { f.logs <- matchedLogs } @@ -476,42 +516,39 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func // filter logs of a single header in light client mode func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { - if bloomFilter(header.Bloom, addresses, topics) { - // Get the logs of the block - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - var unfiltered []*types.Log - for _, logs := range logsList { - for _, log := range logs { - logcopy := *log - logcopy.Removed = remove - unfiltered = append(unfiltered, &logcopy) - } - } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) - if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) { - // We have matching but non-derived logs - receipts, err := es.backend.GetReceipts(ctx, header.Hash()) - if err != nil { - return nil - } - unfiltered = unfiltered[:0] - for _, receipt := range receipts { - for _, log := range receipt.Logs { - logcopy := *log - logcopy.Removed = remove - unfiltered = append(unfiltered, &logcopy) - } - } - logs = filterLogs(unfiltered, nil, nil, addresses, topics) - } + if !bloomFilter(header.Bloom, addresses, topics) { + return nil + } + // Get the logs of the block + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64()) + if err != nil { + return nil + } + unfiltered := append([]*types.Log{}, cached.logs...) + for i, log := range unfiltered { + // Don't modify in-cache elements + logcopy := *log + logcopy.Removed = remove + // Swap copy in-place + unfiltered[i] = &logcopy + } + logs := filterLogs(unfiltered, nil, nil, addresses, topics) + // Txhash is already resolved + if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { return logs } - return nil + // Resolve txhash + body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64()) + if err != nil { + return nil + } + for _, log := range logs { + // logs are already copied, safe to modify + log.TxHash = body.Transactions[log.TxIndex].Hash() + } + return logs } // eventLoop (un)installs filters and processes mux events. diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index da36c01843b6b..2610376d3bf34 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,6 +18,7 @@ package filters import ( "context" + "errors" "fmt" "math/big" "math/rand" @@ -33,6 +34,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/bloombits" "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/params" @@ -72,6 +74,13 @@ func (b *testBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) ( return core.GetHeader(b.db, blockHash, num), nil } +func (b *testBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) { + if body := rawdb.ReadBody(b.db, hash, uint64(number)); body != nil { + return body, nil + } + return nil, errors.New("block body not found") +} + func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) { number := core.GetBlockNumber(b.db, blockHash) return core.GetBlockReceipts(b.db, blockHash, number), nil @@ -592,6 +601,143 @@ func TestPendingLogsSubscription(t *testing.T) { } } +func TestLightFilterLogs(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, true) + signer = types.HomesteadSigner{} + + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") + secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") + thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") + notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") + firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + + // posted twice, once as regular logs and once as pending logs. + allLogs = []*types.Log{ + // Block 1 + {Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0}, + // Block 2 + {Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0}, + {Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1}, + {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2}, + // Block 3 + {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0}, + } + + testCases = []struct { + crit FilterCriteria + expected []*types.Log + id rpc.ID + }{ + // match all + 0: {FilterCriteria{}, allLogs, ""}, + // match none due to no matching addresses + 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, + // match logs based on addresses, ignore topics + 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, + // match logs based on addresses and topics + 3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, + // all logs with block num >= 3 + 4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""}, + // all logs + 5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""}, + // all logs with 1>= block num <=2 and topic secondTopic + 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, + } + + key, _ = crypto.GenerateKey() + addr = crypto.PubkeyToAddress(key.PublicKey) + genesis = &core.Genesis{Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{ + addr: {Balance: big.NewInt(params.Ether)}, + }, + } + receipts = []*types.Receipt{{ + Logs: []*types.Log{allLogs[0]}, + }, { + Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]}, + }, { + Logs: []*types.Log{allLogs[4]}, + }} + ) + + _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) { + if i == 0 { + return + } + receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]}) + b.AddUncheckedReceipt(receipts[i-1]) + tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: big.NewInt(2100), Data: nil}), signer, key) + b.AddTx(tx) + }) + for i, block := range blocks { + rawdb.WriteBlock(db, block) + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + rawdb.WriteHeadBlockHash(db, block.Hash()) + if i > 0 { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]}) + } + } + // create all filters + for i := range testCases { + id, err := api.NewFilter(testCases[i].crit) + if err != nil { + t.Fatal(err) + } + testCases[i].id = id + } + + // raise events + time.Sleep(1 * time.Second) + for _, block := range blocks { + backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs}) + } + + for i, tt := range testCases { + var fetched []*types.Log + timeout := time.Now().Add(1 * time.Second) + for { // fetch all expected logs + results, err := api.GetFilterChanges(tt.id) + if err != nil { + t.Fatalf("Unable to fetch logs: %v", err) + } + fetched = append(fetched, results.([]*types.Log)...) + if len(fetched) >= len(tt.expected) { + break + } + // check timeout + if time.Now().After(timeout) { + break + } + + time.Sleep(100 * time.Millisecond) + } + + if len(fetched) != len(tt.expected) { + t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + return + } + + for l := range fetched { + if fetched[l].Removed { + t.Errorf("expected log not to be removed for log %d in case %d", l, i) + } + expected := *tt.expected[l] + blockNum := expected.BlockNumber - 1 + expected.BlockHash = blocks[blockNum].Hash() + expected.TxHash = blocks[blockNum].Transactions()[0].Hash() + if !reflect.DeepEqual(fetched[l], &expected) { + t.Errorf("invalid log on index %d for case %d", l, i) + } + } + } +} + // TestPendingTxFilterDeadlock tests if the event loop hangs when pending // txes arrive at the same time that one of multiple filters is timing out. // Please refer to #22131 for more details. diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index ad09ad95c00c8..a365e52780202 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -107,6 +107,7 @@ type Backend interface { // eth/filters needs to be initialized from this backend type, so methods needed by // it must also be included here. + GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) filters.Backend } diff --git a/les/api_backend.go b/les/api_backend.go index 680d81f18ed4f..876a1dcbe9f83 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -130,6 +130,10 @@ func (b *LesApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash r return nil, errors.New("invalid arguments; neither block nor hash specified") } +func (b *LesApiBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) { + return light.GetBody(ctx, b.eth.odr, hash, uint64(number)) +} + func (b *LesApiBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { return nil, nil } diff --git a/light/odr_util.go b/light/odr_util.go index 236f5c238220b..d7ebd6739f756 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -124,7 +124,7 @@ func GetBlock(ctx context.Context, odr OdrBackend, hash common.Hash, number uint } // GetBlockReceipts retrieves the receipts generated by the transactions included -// in a block given by its hash. +// in a block given by its hash. Receipts will be filled in with context data. func GetBlockReceipts(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (types.Receipts, error) { // Retrieve the potentially incomplete receipts from disk or network receipts := core.GetBlockReceipts(odr.Database(), hash, number) @@ -153,9 +153,8 @@ func GetBlockReceipts(ctx context.Context, odr OdrBackend, hash common.Hash, num } // GetBlockLogs retrieves the logs generated by the transactions included in a -// block given by its hash. +// block given by its hash. Logs will be filled in with context data. func GetBlockLogs(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) ([][]*types.Log, error) { - // Retrieve the potentially incomplete receipts from disk or network receipts := core.GetBlockReceipts(odr.Database(), hash, number) if receipts == nil { r := &ReceiptsRequest{Hash: hash, Number: number}