Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

fix: OOM when eth_getLogs response too large #860

Merged
merged 10 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 10 additions & 64 deletions rpc/ethereum/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/tharsis/ethermint/rpc/ethereum/namespaces/eth/filters"
"github.com/tharsis/ethermint/rpc/ethereum/types"
"github.com/tharsis/ethermint/server/config"
ethermint "github.com/tharsis/ethermint/types"
Expand Down Expand Up @@ -81,7 +80,6 @@ type Backend interface {
BloomStatus() (uint64, uint64)
GetLogs(hash common.Hash) ([][]*ethtypes.Log, error)
GetLogsByHeight(height *int64) ([][]*ethtypes.Log, error)
GetFilteredBlocks(from int64, to int64, filter [][]filters.BloomIV, filterAddresses bool) ([]int64, error)
ChainConfig() *params.ChainConfig
SetTxDefaults(args evmtypes.TransactionArgs) (evmtypes.TransactionArgs, error)
GetEthereumMsgsFromTendermintBlock(block *tmrpctypes.ResultBlock, blockRes *tmrpctypes.ResultBlockResults) []*evmtypes.MsgEthereumTx
Expand Down Expand Up @@ -945,6 +943,16 @@ func (e *EVMBackend) RPCFeeHistoryCap() int32 {
return e.cfg.JSONRPC.FeeHistoryCap
}

// RPCLogsCap defines the max number of results can be returned from single `eth_getLogs` query.
func (e *EVMBackend) RPCLogsCap() int32 {
return e.cfg.JSONRPC.LogsCap
}

// RPCBlockRangeCap defines the max block range allowed for `eth_getLogs` query.
func (e *EVMBackend) RPCBlockRangeCap() int32 {
return e.cfg.JSONRPC.BlockRangeCap
}

// RPCMinGasPrice returns the minimum gas price for a transaction obtained from
// the node config. If set value is 0, it will default to 20.

Expand Down Expand Up @@ -1017,55 +1025,6 @@ func (e *EVMBackend) BaseFee(height int64) (*big.Int, error) {
return nil, nil
}

// GetFilteredBlocks returns the block height list match the given bloom filters.
func (e *EVMBackend) GetFilteredBlocks(
fedekunze marked this conversation as resolved.
Show resolved Hide resolved
from int64,
to int64,
filters [][]filters.BloomIV,
filterAddresses bool,
) ([]int64, error) {
matchedBlocks := make([]int64, 0)

BLOCKS:
for height := from; height <= to; height++ {
if err := e.ctx.Err(); err != nil {
e.logger.Error("EVMBackend context error", "err", err)
return nil, err
}

h := height
bloom, err := e.BlockBloom(&h)
if err != nil {
e.logger.Error("retrieve header failed", "blockHeight", height, "err", err)
return nil, err
}

for i, filter := range filters {
// filter the header bloom with the addresses
if filterAddresses && i == 0 {
if !checkMatches(bloom, filter) {
continue BLOCKS
}

// the filter doesn't have any topics
if len(filters) == 1 {
matchedBlocks = append(matchedBlocks, height)
continue BLOCKS
}
continue
}

// filter the bloom with topics
if len(filter) > 0 && !checkMatches(bloom, filter) {
continue BLOCKS
}
}
matchedBlocks = append(matchedBlocks, height)
}

return matchedBlocks, nil
}

// GetEthereumMsgsFromTendermintBlock returns all real MsgEthereumTxs from a Tendermint block.
// It also ensures consistency over the correct txs indexes across RPC endpoints
func (e *EVMBackend) GetEthereumMsgsFromTendermintBlock(block *tmrpctypes.ResultBlock, blockRes *tmrpctypes.ResultBlockResults) []*evmtypes.MsgEthereumTx {
Expand Down Expand Up @@ -1100,16 +1059,3 @@ func (e *EVMBackend) GetEthereumMsgsFromTendermintBlock(block *tmrpctypes.Result

return result
}

// checkMatches revised the function from
// https://github.com/ethereum/go-ethereum/blob/401354976bb44f0ad4455ca1e0b5c0dc31d9a5f5/core/types/bloom9.go#L88
func checkMatches(bloom ethtypes.Bloom, filter []filters.BloomIV) bool {
for _, bloomIV := range filter {
if bloomIV.V[0] == bloomIV.V[0]&bloom[bloomIV.I[0]] &&
bloomIV.V[1] == bloomIV.V[1]&bloom[bloomIV.I[1]] &&
bloomIV.V[2] == bloomIV.V[2]&bloom[bloomIV.I[2]] {
return true
}
}
return false
}
9 changes: 5 additions & 4 deletions rpc/ethereum/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ type Backend interface {
HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error)
GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error)
GetLogsByNumber(blockNum types.BlockNumber) ([][]*ethtypes.Log, error)
BlockBloom(height *int64) (ethtypes.Bloom, error)

GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
BloomStatus() (uint64, uint64)

GetFilteredBlocks(from int64, to int64, bloomIndexes [][]BloomIV, filterAddresses bool) ([]int64, error)

RPCFilterCap() int32
RPCLogsCap() int32
RPCBlockRangeCap() int32
}

// consider a filter inactive if it has not been polled for within deadline
Expand Down Expand Up @@ -499,7 +500,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit filters.FilterCrit
}

// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
logs, err := filter.Logs(ctx, int(api.backend.RPCLogsCap()), int64(api.backend.RPCBlockRangeCap()))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -560,7 +561,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*et
filter = NewRangeFilter(api.logger, api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
logs, err := filter.Logs(ctx, int(api.backend.RPCLogsCap()), int64(api.backend.RPCBlockRangeCap()))
if err != nil {
return nil, err
}
Expand Down
40 changes: 20 additions & 20 deletions rpc/ethereum/namespaces/eth/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ func newFilter(logger log.Logger, backend Backend, criteria filters.FilterCriter
}

const (
maxFilterBlocks = 100000
maxToOverhang = 600
maxToOverhang = 600
fedekunze marked this conversation as resolved.
Show resolved Hide resolved
)

// Logs searches the blockchain for matching log entries, returning all from the
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
func (f *Filter) Logs(_ context.Context, logLimit int, blockLimit int64) ([]*ethtypes.Log, error) {
logs := []*ethtypes.Log{}
var err error

Expand All @@ -105,7 +104,7 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
return nil, errors.Errorf("unknown block header %s", f.criteria.BlockHash.String())
}

return f.blockLogs(header)
return f.blockLogs(header.Number.Int64(), header.Bloom)
}

// Figure out the limits of the filter range
Expand All @@ -131,8 +130,8 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
f.criteria.ToBlock = big.NewInt(1)
}

if f.criteria.ToBlock.Int64()-f.criteria.FromBlock.Int64() > maxFilterBlocks {
return nil, errors.Errorf("maximum [from, to] blocks distance: %d", maxFilterBlocks)
if f.criteria.ToBlock.Int64()-f.criteria.FromBlock.Int64() > blockLimit {
return nil, errors.Errorf("maximum [from, to] blocks distance: %d", blockLimit)
}

// check bounds
Expand All @@ -145,36 +144,37 @@ func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
from := f.criteria.FromBlock.Int64()
to := f.criteria.ToBlock.Int64()

blocks, err := f.backend.GetFilteredBlocks(from, to, f.bloomFilters, len(f.criteria.Addresses) > 0)
if err != nil {
return nil, err
}
for height := from; height <= to; height++ {
bloom, err := f.backend.BlockBloom(&height)
if err != nil {
return logs, err
yihuang marked this conversation as resolved.
Show resolved Hide resolved
}

for _, height := range blocks {
ethLogs, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
filtered, err := f.blockLogs(height, bloom)
if err != nil {
return logs, errors.Wrapf(err, "failed to fetch block by number %d", height)
return logs, err
yihuang marked this conversation as resolved.
Show resolved Hide resolved
}

for _, ethLog := range ethLogs {
filtered := FilterLogs(ethLog, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
logs = append(logs, filtered...)
// check logs limit
if len(logs)+len(filtered) > logLimit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I still want to trigger an OOM, I can ensure that len(logs) > maxint/2 as well as len(filtered) > maxint/2 which will bypass this check as they'll overflow and become negative in the addition len(logs)+len(filtered). To properly fix this, you mos def want to check arithmetic by subtraction and comparisons individually against len(logs) then len(filtered)

Copy link
Contributor Author

@yihuang yihuang Dec 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since len(logs) <= logLimit and len(filtered) <= block gas limit / gas per log, minimal gas per log is 375, a typical block gas limit is tens of millions, so it's pretty safe even for 32bit int, basically impossible for 64bit int.

return logs, nil
yihuang marked this conversation as resolved.
Show resolved Hide resolved
}
logs = append(logs, filtered...)
}
return logs, nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
if !bloomFilter(header.Bloom, f.criteria.Addresses, f.criteria.Topics) {
func (f *Filter) blockLogs(height int64, bloom ethtypes.Bloom) ([]*ethtypes.Log, error) {
if !bloomFilter(bloom, f.criteria.Addresses, f.criteria.Topics) {
return []*ethtypes.Log{}, nil
}

// DANGER: do not call GetLogs(header.Hash())
// eth header's hash doesn't match tm block hash
logsList, err := f.backend.GetLogsByNumber(types.BlockNumber(header.Number.Int64()))
logsList, err := f.backend.GetLogsByNumber(types.BlockNumber(height))
if err != nil {
return []*ethtypes.Log{}, errors.Wrapf(err, "failed to fetch logs block number %d", header.Number.Int64())
return []*ethtypes.Log{}, errors.Wrapf(err, "failed to fetch logs block number %d", height)
}

unfiltered := make([]*ethtypes.Log, 0)
Expand Down
10 changes: 10 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (

DefaultFeeHistoryCap int32 = 100

DefaultLogsCap int32 = 10000

DefaultBlockRangeCap int32 = 10000

DefaultEVMTimeout = 5 * time.Second
// default 1.0 eth
DefaultTxFeeCap float64 = 1.0
Expand Down Expand Up @@ -78,6 +82,10 @@ type JSONRPCConfig struct {
FeeHistoryCap int32 `mapstructure:"feehistory-cap"`
// Enable defines if the EVM RPC server should be enabled.
Enable bool `mapstructure:"enable"`
// LogsCap defines the max number of results can be returned from single `eth_getLogs` query.
LogsCap int32 `mapstructure:"logs-cap"`
// BlockRangeCap defines the max block range allowed for `eth_getLogs` query.
BlockRangeCap int32 `mapstructure:"block-range-cap"`
fedekunze marked this conversation as resolved.
Show resolved Hide resolved
}

// TLSConfig defines the certificate and matching private key for the server.
Expand Down Expand Up @@ -171,6 +179,8 @@ func DefaultJSONRPCConfig() *JSONRPCConfig {
TxFeeCap: DefaultTxFeeCap,
FilterCap: DefaultFilterCap,
FeeHistoryCap: DefaultFeeHistoryCap,
BlockRangeCap: DefaultBlockRangeCap,
LogsCap: DefaultLogsCap,
}
}

Expand Down