Skip to content

Commit

Permalink
Backport: Improve fetching event batches from node (#1071) (#1072)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jul 20, 2024
1 parent c0d3cde commit 4a5a120
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

### Fixed

- evm.events: Improve fetching event batches from node.
- models: Fixed `CachedModel` preloading.

## [7.5.8] - 2024-07-04
Expand Down
1 change: 1 addition & 0 deletions src/dipdup/datasources/abi_etherscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class EtherscanDatasource(AbiDatasource[EtherscanDatasourceConfig]):
ratelimit_rate=1,
ratelimit_period=5,
ratelimit_sleep=15,
retry_count=5,
)

async def run(self) -> None:
Expand Down
25 changes: 17 additions & 8 deletions src/dipdup/indexes/evm_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import random
from abc import ABC
from collections import defaultdict
Expand All @@ -15,7 +16,10 @@
MIN_BATCH_SIZE = 10
MAX_BATCH_SIZE = 10000
BATCH_SIZE_UP = 1.1
BATCH_SIZE_DOWN = 0.5
BATCH_SIZE_DOWN = 0.65


_logger = logging.getLogger(__name__)


class EvmNodeFetcher(Generic[FetcherBufferT], DataFetcher[FetcherBufferT], ABC):
Expand All @@ -29,13 +33,16 @@ def __init__(
self._datasources = datasources

def get_next_batch_size(self, batch_size: int, ratelimited: bool) -> int:
old_batch_size = batch_size
if ratelimited:
batch_size = int(batch_size * BATCH_SIZE_DOWN)
else:
batch_size = int(batch_size * BATCH_SIZE_UP)

batch_size = min(MAX_BATCH_SIZE, batch_size)
batch_size = max(MIN_BATCH_SIZE, batch_size)
batch_size = int(max(MIN_BATCH_SIZE, batch_size))
if batch_size != old_batch_size:
_logger.debug('Batch size updated: %s -> %s', old_batch_size, batch_size)
return int(batch_size)

def get_random_node(self) -> EvmNodeDatasource:
Expand Down Expand Up @@ -74,16 +81,18 @@ async def get_logs_batch(
self,
first_level: int,
last_level: int,
addresses: set[str] | None = None,
node: EvmNodeDatasource | None = None,
) -> dict[int, list[dict[str, Any]]]:
grouped_logs: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
node = node or self.get_random_node()
logs = await node.get_logs(
{
'fromBlock': hex(first_level),
'toBlock': hex(last_level),
},
)
params: dict[str, Any] = {
'fromBlock': hex(first_level),
'toBlock': hex(last_level),
}
if addresses:
params['address'] = list(addresses)
logs = await node.get_logs(params)
for log in logs:
grouped_logs[int(log['blockNumber'], 16)].append(log)
return grouped_logs
17 changes: 14 additions & 3 deletions src/dipdup/indexes/evm_subsquid_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubsquidEventDa
class EvmNodeEventFetcher(EvmNodeFetcher[EvmNodeLogData]):
_datasource: EvmNodeDatasource

def __init__(
self,
datasources: tuple[EvmNodeDatasource, ...],
first_level: int,
last_level: int,
addresses: set[str],
) -> None:
super().__init__(datasources, first_level, last_level)
self._addresses = addresses

async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmNodeLogData, ...]]]:
event_iter = self._fetch_by_level()
async for level, batch in readahead_by_level(event_iter, limit=EVM_NODE_READAHEAD_LIMIT):
Expand All @@ -62,9 +72,10 @@ async def _fetch_by_level(self) -> AsyncIterator[tuple[EvmNodeLogData, ...]]:
self._last_level,
)
log_batch = await self.get_logs_batch(
batch_first_level,
batch_last_level,
node,
first_level=batch_first_level,
last_level=batch_last_level,
addresses=self._addresses,
node=node,
)

finished = time.time()
Expand Down
12 changes: 12 additions & 0 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,22 @@ def _create_subsquid_fetcher(self, first_level: int, last_level: int) -> Subsqui
)

def _create_node_fetcher(self, first_level: int, last_level: int) -> EvmNodeEventFetcher:
if not self.node_datasources:
raise FrameworkException('Creating EvmNodeEventFetcher, but no `evm.node` datasources available')

addresses = set()
for handler_config in self._config.handlers:
if handler_config.contract.address:
addresses.add(handler_config.contract.address)
else:
addresses.clear()
break

return EvmNodeEventFetcher(
datasources=self.node_datasources,
first_level=first_level,
last_level=last_level,
addresses=addresses,
)

def _match_level_data(
Expand Down

0 comments on commit 4a5a120

Please sign in to comment.