Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: remove support for pending logs #29574

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false),
Service: filters.NewFilterAPI(filterSystem),
}})
return filterSystem
}
Expand Down
15 changes: 8 additions & 7 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
)

var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errInvalidBlockRange = errors.New("invalid block range params")
errExceedMaxTopics = errors.New("exceed max topics")
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errInvalidBlockRange = errors.New("invalid block range params")
errPendingLogsUnsupported = errors.New("pending logs are not supported")
errExceedMaxTopics = errors.New("exceed max topics")
)

// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
Expand Down Expand Up @@ -70,10 +71,10 @@ type FilterAPI struct {
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
func NewFilterAPI(system *FilterSystem) *FilterAPI {
api := &FilterAPI{
sys: system,
events: NewEventSystem(system, lightMode),
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
}
Expand Down Expand Up @@ -456,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.txs = nil
return hashes, nil
}
case LogsSubscription, MinedAndPendingLogsSubscription:
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
Expand Down
43 changes: 4 additions & 39 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return f.blockLogs(ctx, header)
}

var (
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)

// special case for pending logs
if beginPending && !endPending {
return nil, errInvalidBlockRange
}

// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
// Disallow pending logs.
if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
return nil, errPendingLogsUnsupported
}

resolveSpecial := func(number int64) (int64, error) {
Expand Down Expand Up @@ -165,16 +155,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
return logs, err
}
}
}
Expand Down Expand Up @@ -332,22 +313,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
return logs, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() []*types.Log {
block, receipts, _ := f.sys.backend.Pending()
if block == nil || receipts == nil {
return nil
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return nil
}

// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
var check = func(log *types.Log) bool {
Expand Down
195 changes: 9 additions & 186 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -63,7 +61,6 @@ type Backend interface {
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
Pending() (*types.Block, types.Receipts, *state.StateDB)

CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
Expand Down Expand Up @@ -152,10 +149,6 @@ const (
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
// PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries for pending transactions entering
// the pending state
PendingTransactionsSubscription
Expand Down Expand Up @@ -192,10 +185,8 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
backend Backend
sys *FilterSystem
lightMode bool
lastHead *types.Header
backend Backend
sys *FilterSystem

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
Expand All @@ -218,11 +209,10 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
func NewEventSystem(sys *FilterSystem) *EventSystem {
m := &EventSystem{
sys: sys,
backend: sys.backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
Expand Down Expand Up @@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
to = rpc.BlockNumber(crit.ToBlock.Int64())
}

// only interested in pending logs
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribePendingLogs(crit, logs), nil
// Pending logs are not supported anymore.
if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber {
return nil, errPendingLogsUnsupported
}

// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
Expand All @@ -322,34 +313,13 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
if from >= 0 && to >= 0 && to >= from {
return es.subscribeLogs(crit, logs), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribeMinedPendingLogs(crit, logs), nil
}
// interested in logs from a specific block number to new mined blocks
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
return nil, errInvalidBlockRange
}

// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
Expand All @@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
return es.subscribe(sub)
}

// subscribePendingLogs creates a subscription that writes contract event logs for
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewHeads creates a subscription that writes the header of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
Expand Down Expand Up @@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
}
}

func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) {
if len(logs) == 0 {
return
}
for _, f := range filters[PendingLogsSubscription] {
matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
for _, f := range filters[PendingTransactionsSubscription] {
f.txs <- ev.Txs
Expand All @@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 {
continue
}
if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 {
continue
}
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}

func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
if oldh == nil {
return
}
newh := newHeader
// find common ancestor, create list of rolled back and new block hashes
var oldHeaders, newHeaders []*types.Header
for oldh.Hash() != newh.Hash() {
if oldh.Number.Uint64() >= newh.Number.Uint64() {
oldHeaders = append(oldHeaders, oldh)
oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
}
if oldh.Number.Uint64() < newh.Number.Uint64() {
newHeaders = append(newHeaders, newh)
newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
if newh == nil {
// happens when CHT syncing, nothing to do
newh = oldh
}
}
}
// roll back old blocks
for _, h := range oldHeaders {
callBack(h, true)
}
// check new blocks (array is in reverse order)
for i := len(newHeaders) - 1; i >= 0; i-- {
callBack(newHeaders[i], false)
}
}

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if !bloomFilter(header.Bloom, addresses, topics) {
return nil
}
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
unfiltered := append([]*types.Log{}, cached.logs...)
for i, log := range unfiltered {
// Don't modify in-cache elements
logcopy := *log
logcopy.Removed = remove
// Swap copy in-place
unfiltered[i] = &logcopy
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
// Txhash is already resolved
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs
}
// Resolve txhash
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
for _, log := range logs {
// logs are already copied, safe to modify
log.TxHash = body.Transactions[log.TxIndex].Hash()
}
return logs
}

// eventLoop (un)installs filters and processes mux events.
Expand Down Expand Up @@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() {
es.handleLogs(index, ev.Logs)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
// If we have no pending log subscription,
// we don't need to collect any pending logs.
if len(index[PendingLogsSubscription]) == 0 {
continue
}

// Pull the pending logs if there is a new chain head.
pendingBlock, pendingReceipts, _ := es.backend.Pending()
if pendingBlock == nil || pendingReceipts == nil {
continue
}
if pendingBlock.ParentHash() != ev.Block.Hash() {
continue
}
var logs []*types.Log
for _, receipt := range pendingReceipts {
if len(receipt.Logs) > 0 {
logs = append(logs, receipt.Logs...)
}
}
es.handlePendingLogs(index, logs)

case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
index[LogsSubscription][f.id] = f
index[PendingLogsSubscription][f.id] = f
} else {
index[f.typ][f.id] = f
}
index[f.typ][f.id] = f
close(f.installed)

case f := <-es.uninstall:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(index[LogsSubscription], f.id)
delete(index[PendingLogsSubscription], f.id)
} else {
delete(index[f.typ], f.id)
}
delete(index[f.typ], f.id)
close(f.err)

// System stopped
Expand Down
Loading
Loading