From 4a5a1206a1f80f1c526501b3faa99fb5878a8ee7 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskii Date: Sat, 20 Jul 2024 11:35:39 -0300 Subject: [PATCH] Backport: Improve fetching event batches from node (#1071) (#1072) --- CHANGELOG.md | 1 + src/dipdup/datasources/abi_etherscan.py | 1 + src/dipdup/indexes/evm_node.py | 25 +++++++++++++------ .../indexes/evm_subsquid_events/fetcher.py | 17 ++++++++++--- .../indexes/evm_subsquid_events/index.py | 12 +++++++++ 5 files changed, 45 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 837172349..b2ca936ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic ### Fixed +- evm.events: Improve fetching event batches from node. - models: Fixed `CachedModel` preloading. ## [7.5.8] - 2024-07-04 diff --git a/src/dipdup/datasources/abi_etherscan.py b/src/dipdup/datasources/abi_etherscan.py index 48e666341..b1bf2c314 100644 --- a/src/dipdup/datasources/abi_etherscan.py +++ b/src/dipdup/datasources/abi_etherscan.py @@ -15,6 +15,7 @@ class EtherscanDatasource(AbiDatasource[EtherscanDatasourceConfig]): ratelimit_rate=1, ratelimit_period=5, ratelimit_sleep=15, + retry_count=5, ) async def run(self) -> None: diff --git a/src/dipdup/indexes/evm_node.py b/src/dipdup/indexes/evm_node.py index 8717a384d..f87f12fde 100644 --- a/src/dipdup/indexes/evm_node.py +++ b/src/dipdup/indexes/evm_node.py @@ -1,4 +1,5 @@ import asyncio +import logging import random from abc import ABC from collections import defaultdict @@ -15,7 +16,10 @@ MIN_BATCH_SIZE = 10 MAX_BATCH_SIZE = 10000 BATCH_SIZE_UP = 1.1 -BATCH_SIZE_DOWN = 0.5 +BATCH_SIZE_DOWN = 0.65 + + +_logger = logging.getLogger(__name__) class EvmNodeFetcher(Generic[FetcherBufferT], DataFetcher[FetcherBufferT], ABC): @@ -29,13 +33,16 @@ def __init__( self._datasources = datasources def get_next_batch_size(self, batch_size: int, ratelimited: bool) -> int: + old_batch_size = batch_size if ratelimited: batch_size = int(batch_size * BATCH_SIZE_DOWN) else: batch_size = int(batch_size * BATCH_SIZE_UP) batch_size = min(MAX_BATCH_SIZE, batch_size) - batch_size = max(MIN_BATCH_SIZE, batch_size) + batch_size = int(max(MIN_BATCH_SIZE, batch_size)) + if batch_size != old_batch_size: + _logger.debug('Batch size updated: %s -> %s', old_batch_size, batch_size) return int(batch_size) def get_random_node(self) -> EvmNodeDatasource: @@ -74,16 +81,18 @@ async def get_logs_batch( self, first_level: int, last_level: int, + addresses: set[str] | None = None, node: EvmNodeDatasource | None = None, ) -> dict[int, list[dict[str, Any]]]: grouped_logs: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) node = node or self.get_random_node() - logs = await node.get_logs( - { - 'fromBlock': hex(first_level), - 'toBlock': hex(last_level), - }, - ) + params: dict[str, Any] = { + 'fromBlock': hex(first_level), + 'toBlock': hex(last_level), + } + if addresses: + params['address'] = list(addresses) + logs = await node.get_logs(params) for log in logs: grouped_logs[int(log['blockNumber'], 16)].append(log) return grouped_logs diff --git a/src/dipdup/indexes/evm_subsquid_events/fetcher.py b/src/dipdup/indexes/evm_subsquid_events/fetcher.py index f2f639b90..3880c7b8f 100644 --- a/src/dipdup/indexes/evm_subsquid_events/fetcher.py +++ b/src/dipdup/indexes/evm_subsquid_events/fetcher.py @@ -40,6 +40,16 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubsquidEventDa class EvmNodeEventFetcher(EvmNodeFetcher[EvmNodeLogData]): _datasource: EvmNodeDatasource + def __init__( + self, + datasources: tuple[EvmNodeDatasource, ...], + first_level: int, + last_level: int, + addresses: set[str], + ) -> None: + super().__init__(datasources, first_level, last_level) + self._addresses = addresses + async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmNodeLogData, ...]]]: event_iter = self._fetch_by_level() async for level, batch in readahead_by_level(event_iter, limit=EVM_NODE_READAHEAD_LIMIT): @@ -62,9 +72,10 @@ async def _fetch_by_level(self) -> AsyncIterator[tuple[EvmNodeLogData, ...]]: self._last_level, ) log_batch = await self.get_logs_batch( - batch_first_level, - batch_last_level, - node, + first_level=batch_first_level, + last_level=batch_last_level, + addresses=self._addresses, + node=node, ) finished = time.time() diff --git a/src/dipdup/indexes/evm_subsquid_events/index.py b/src/dipdup/indexes/evm_subsquid_events/index.py index 4c5b95558..58cf26879 100644 --- a/src/dipdup/indexes/evm_subsquid_events/index.py +++ b/src/dipdup/indexes/evm_subsquid_events/index.py @@ -87,10 +87,22 @@ def _create_subsquid_fetcher(self, first_level: int, last_level: int) -> Subsqui ) def _create_node_fetcher(self, first_level: int, last_level: int) -> EvmNodeEventFetcher: + if not self.node_datasources: + raise FrameworkException('Creating EvmNodeEventFetcher, but no `evm.node` datasources available') + + addresses = set() + for handler_config in self._config.handlers: + if handler_config.contract.address: + addresses.add(handler_config.contract.address) + else: + addresses.clear() + break + return EvmNodeEventFetcher( datasources=self.node_datasources, first_level=first_level, last_level=last_level, + addresses=addresses, ) def _match_level_data(