From a2f0f09cbc4a02eb22377851cd73894576b25309 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 20 Jul 2024 11:11:57 -0300 Subject: [PATCH] Improve fetching event batches from node --- CHANGELOG.md | 1 + docs/9.release-notes/_8.0_changelog.md | 1 + src/dipdup/datasources/abi_etherscan.py | 1 + src/dipdup/indexes/evm_events/fetcher.py | 17 +++++++++++++--- src/dipdup/indexes/evm_events/index.py | 9 +++++++++ src/dipdup/indexes/evm_node.py | 25 ++++++++++++++++-------- 6 files changed, 43 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c7066ed3..485b547a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic ### Fixed +- evm.events: Improve fetching event batches from node. - models: Fixed `CachedModel` preloading. ## [8.0.0b3] - 2024-07-04 diff --git a/docs/9.release-notes/_8.0_changelog.md b/docs/9.release-notes/_8.0_changelog.md index d9b18ea7e..7cbdfb4e2 100644 --- a/docs/9.release-notes/_8.0_changelog.md +++ b/docs/9.release-notes/_8.0_changelog.md @@ -27,6 +27,7 @@ - config: Fixed (de)serialization of hex strings in config. - config: Fixed setting logging levels according to the config. - evm.events: Fixed matching logs when filtering by topic0. +- evm.events: Improve fetching event batches from node. - evm.subsquid: Fixed typo in `iter_events` method name. - models: Fixed `CachedModel` preloading. - models: Fixed setting default value for `Meta.maxsize`. diff --git a/src/dipdup/datasources/abi_etherscan.py b/src/dipdup/datasources/abi_etherscan.py index f1f486bb1..28f2f8c52 100644 --- a/src/dipdup/datasources/abi_etherscan.py +++ b/src/dipdup/datasources/abi_etherscan.py @@ -16,6 +16,7 @@ class AbiEtherscanDatasource(AbiDatasource[AbiEtherscanDatasourceConfig], EvmAbi ratelimit_rate=1, ratelimit_period=5, ratelimit_sleep=15, + retry_count=5, ) async def run(self) -> None: diff --git a/src/dipdup/indexes/evm_events/fetcher.py b/src/dipdup/indexes/evm_events/fetcher.py index 51d97f44a..9e4b9036e 100644 --- a/src/dipdup/indexes/evm_events/fetcher.py +++ b/src/dipdup/indexes/evm_events/fetcher.py @@ -36,6 +36,16 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmEventData, . class EvmNodeEventFetcher(EvmNodeFetcher[EvmEventData]): _datasource: EvmNodeDatasource + def __init__( + self, + datasources: tuple[EvmNodeDatasource, ...], + first_level: int, + last_level: int, + addresses: set[str], + ) -> None: + super().__init__(datasources, first_level, last_level) + self._addresses = addresses + async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmEventData, ...]]]: event_iter = self._fetch_by_level() async for level, batch in readahead_by_level(event_iter, limit=EVM_NODE_READAHEAD_LIMIT): @@ -58,9 +68,10 @@ async def _fetch_by_level(self) -> AsyncIterator[tuple[EvmEventData, ...]]: self._last_level, ) event_batch = await self.get_events_batch( - batch_first_level, - batch_last_level, - node, + first_level=batch_first_level, + last_level=batch_last_level, + addresses=self._addresses, + node=node, ) finished = time.time() diff --git a/src/dipdup/indexes/evm_events/index.py b/src/dipdup/indexes/evm_events/index.py index d0413aefd..91b689625 100644 --- a/src/dipdup/indexes/evm_events/index.py +++ b/src/dipdup/indexes/evm_events/index.py @@ -90,10 +90,19 @@ def _create_node_fetcher(self, first_level: int, last_level: int) -> EvmNodeEven if not self.node_datasources: raise FrameworkException('Creating EvmNodeEventFetcher, but no `evm.node` datasources available') + addresses = set() + for handler_config in self._config.handlers: + if handler_config.contract.address: + addresses.add(handler_config.contract.address) + else: + addresses.clear() + break + return EvmNodeEventFetcher( datasources=self.node_datasources, first_level=first_level, last_level=last_level, + addresses=addresses, ) def _match_level_data( diff --git a/src/dipdup/indexes/evm_node.py b/src/dipdup/indexes/evm_node.py index 69e040db3..27b27a88e 100644 --- a/src/dipdup/indexes/evm_node.py +++ b/src/dipdup/indexes/evm_node.py @@ -1,4 +1,5 @@ import asyncio +import logging import random from abc import ABC from collections import defaultdict @@ -15,19 +16,25 @@ MIN_BATCH_SIZE = 10 MAX_BATCH_SIZE = 10000 BATCH_SIZE_UP = 1.1 -BATCH_SIZE_DOWN = 0.5 +BATCH_SIZE_DOWN = 0.65 + + +_logger = logging.getLogger(__name__) class EvmNodeFetcher(Generic[BufferT], DataFetcher[BufferT, EvmNodeDatasource], ABC): def get_next_batch_size(self, batch_size: int, ratelimited: bool) -> int: + old_batch_size = batch_size if ratelimited: batch_size = int(batch_size * BATCH_SIZE_DOWN) else: batch_size = int(batch_size * BATCH_SIZE_UP) batch_size = min(MAX_BATCH_SIZE, batch_size) - batch_size = max(MIN_BATCH_SIZE, batch_size) + batch_size = int(max(MIN_BATCH_SIZE, batch_size)) + if batch_size != old_batch_size: + _logger.debug('Batch size updated: %s -> %s', old_batch_size, batch_size) return int(batch_size) def get_random_node(self) -> EvmNodeDatasource: @@ -66,16 +73,18 @@ async def get_events_batch( self, first_level: int, last_level: int, + addresses: set[str] | None = None, node: EvmNodeDatasource | None = None, ) -> dict[int, list[dict[str, Any]]]: grouped_events: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) node = node or self.get_random_node() - logs = await node.get_events( - { - 'fromBlock': hex(first_level), - 'toBlock': hex(last_level), - }, - ) + params: dict[str, Any] = { + 'fromBlock': hex(first_level), + 'toBlock': hex(last_level), + } + if addresses: + params['address'] = list(addresses) + logs = await node.get_events(params) for log in logs: grouped_events[int(log['blockNumber'], 16)].append(log) return grouped_events