From def1a05809dbee35e4eb4ca5eb32d614ff671c29 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Thu, 8 Jul 2021 09:45:31 +0000 Subject: [PATCH] Retry failed requests in DatasourceRequestProxy (#81) --- src/dipdup/config.py | 14 +++++++ src/dipdup/datasources/bcd/datasource.py | 4 +- src/dipdup/datasources/coinbase/datasource.py | 9 +---- src/dipdup/datasources/proxy.py | 39 +++++++++++++------ src/dipdup/datasources/tzkt/datasource.py | 8 +++- src/dipdup/dipdup.py | 25 ++++++++++-- 6 files changed, 74 insertions(+), 25 deletions(-) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 8951fd232..d6ebbdc3d 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -26,6 +26,8 @@ CONFIGURE_HANDLER = 'on_configure' BLOCK_HANDLER = 'on_block' ENV_VARIABLE_REGEX = r'\${([\w]*):-(.*)}' +DEFAULT_RETRY_COUNT = 3 +DEFAULT_RETRY_SLEEP = 1 sys.path.append(os.getcwd()) _logger = logging.getLogger(__name__) @@ -140,6 +142,10 @@ class TzktDatasourceConfig(NameMixin): kind: Literal['tzkt'] url: str + cache: Optional[bool] = None + retry_count: int = DEFAULT_RETRY_COUNT + retry_sleep: int = DEFAULT_RETRY_SLEEP + def __hash__(self): return hash(self.url) @@ -162,6 +168,10 @@ class BcdDatasourceConfig(NameMixin): url: str network: str + cache: Optional[bool] = None + retry_count: int = DEFAULT_RETRY_COUNT + retry_sleep: int = DEFAULT_RETRY_SLEEP + def __hash__(self): return hash(self.url + self.network) @@ -180,6 +190,10 @@ class CoinbaseDatasourceConfig(NameMixin): secret_key: Optional[str] = None passphrase: Optional[str] = None + cache: Optional[bool] = None + retry_count: int = DEFAULT_RETRY_COUNT + retry_sleep: int = DEFAULT_RETRY_SLEEP + def __hash__(self): return hash(self.kind) diff --git a/src/dipdup/datasources/bcd/datasource.py b/src/dipdup/datasources/bcd/datasource.py index 64a8f5b14..c03157fdb 100644 --- a/src/dipdup/datasources/bcd/datasource.py +++ b/src/dipdup/datasources/bcd/datasource.py @@ -7,11 +7,11 @@ class BcdDatasource: - def __init__(self, url: str, network: str, cache: bool) -> None: + def __init__(self, url: str, network: str, proxy=DatasourceRequestProxy()) -> None: self._url = url.rstrip('/') self._network = network + self._proxy = proxy self._logger = logging.getLogger('dipdup.bcd') - self._proxy = DatasourceRequestProxy(cache) async def close_session(self) -> None: await self._proxy.close_session() diff --git a/src/dipdup/datasources/coinbase/datasource.py b/src/dipdup/datasources/coinbase/datasource.py index c905d32a8..66e558bc6 100644 --- a/src/dipdup/datasources/coinbase/datasource.py +++ b/src/dipdup/datasources/coinbase/datasource.py @@ -2,8 +2,6 @@ from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Tuple -from aiolimiter import AsyncLimiter - from dipdup.datasources.coinbase.models import CandleData, CandleInterval from dipdup.datasources.proxy import DatasourceRequestProxy @@ -13,12 +11,9 @@ class CoinbaseDatasource: - def __init__(self, cache: bool) -> None: + def __init__(self, proxy: DatasourceRequestProxy) -> None: self._logger = logging.getLogger('dipdup.coinbase') - self._proxy = DatasourceRequestProxy( - cache=cache, - ratelimiter=AsyncLimiter(max_rate=10, time_period=1), - ) + self._proxy = proxy async def close_session(self) -> None: await self._proxy.close_session() diff --git a/src/dipdup/datasources/proxy.py b/src/dipdup/datasources/proxy.py index bde98a86f..f298e3c20 100644 --- a/src/dipdup/datasources/proxy.py +++ b/src/dipdup/datasources/proxy.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import logging import pickle @@ -7,16 +8,40 @@ from aiolimiter import AsyncLimiter from fcache.cache import FileCache # type: ignore +from dipdup.config import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_SLEEP # type: ignore from dipdup.utils import http_request class DatasourceRequestProxy: - def __init__(self, cache: bool = False, ratelimiter: Optional[AsyncLimiter] = None) -> None: + """Wrapper for datasource HTTP requests. + + Covers caching, retrying failed requests and ratelimiting""" + + def __init__( + self, + cache: bool = False, + retry_count: int = DEFAULT_RETRY_COUNT, + retry_sleep: int = DEFAULT_RETRY_SLEEP, + ratelimiter: Optional[AsyncLimiter] = None, + ) -> None: self._logger = logging.getLogger(__name__) self._cache = FileCache('dipdup', flag='cs') if cache else None + self._retry_count = retry_count + self._retry_sleep = retry_sleep self._ratelimiter = ratelimiter self._session = aiohttp.ClientSession() + async def _wrapped_request(self, method: str, **kwargs): + for attempt in range(self._retry_count): + self._logger.debug('Datasource request attempt %s/%s', attempt + 1, self._retry_count) + try: + return await http_request(self._session, method, **kwargs) + except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e: + if attempt + 1 == self._retry_count: + raise e + self._logger.warning('Datasource request failed: %s', e) + await asyncio.sleep(self._retry_sleep) + async def http_request(self, method: str, skip_cache: bool = False, weight: int = 1, **kwargs): if self._cache is not None and not skip_cache: key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest() @@ -25,21 +50,13 @@ async def http_request(self, method: str, skip_cache: bool = False, weight: int except KeyError: if self._ratelimiter: await self._ratelimiter.acquire(weight) - response = await http_request( - session=self._session, - method=method, - **kwargs, - ) + response = await self._wrapped_request(method, **kwargs) self._cache[key] = response return response else: if self._ratelimiter: await self._ratelimiter.acquire(weight) - response = await http_request( - session=self._session, - method=method, - **kwargs, - ) + response = await self._wrapped_request(method, **kwargs) return response async def close_session(self) -> None: diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 2ce6b9f03..6c755f68d 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -263,9 +263,14 @@ class TzktDatasource(IndexDatasource): * Calls Matchers to match received operation groups with indexes' pattern and spawn callbacks on match """ - def __init__(self, url: str, cache: bool) -> None: + def __init__( + self, + url: str, + proxy: DatasourceRequestProxy, + ) -> None: super().__init__() self._url = url.rstrip('/') + self._proxy = proxy self._logger = logging.getLogger('dipdup.tzkt') self._transaction_subscriptions: Set[str] = set() @@ -273,7 +278,6 @@ def __init__(self, url: str, cache: bool) -> None: self._big_map_subscriptions: Dict[str, List[str]] = {} self._client: Optional[BaseHubConnection] = None - self._proxy = DatasourceRequestProxy(cache) self._level: Optional[int] = None self._sync_level: Optional[int] = None diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 82ca71a41..e9066d528 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -6,6 +6,7 @@ from posix import listdir from typing import Dict, List, cast +from aiolimiter import AsyncLimiter from apscheduler.schedulers import SchedulerNotRunningError # type: ignore from genericpath import exists from tortoise import Tortoise @@ -33,6 +34,7 @@ from dipdup.datasources.bcd.datasource import BcdDatasource from dipdup.datasources.coinbase.datasource import CoinbaseDatasource from dipdup.datasources.datasource import IndexDatasource +from dipdup.datasources.proxy import DatasourceRequestProxy from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.exceptions import ConfigurationError from dipdup.hasura import configure_hasura @@ -228,20 +230,37 @@ async def _create_datasources(self) -> None: if name in self._datasources: continue + cache = self._config.cache_enabled if datasource_config.cache is None else datasource_config.cache if isinstance(datasource_config, TzktDatasourceConfig): + proxy = DatasourceRequestProxy( + cache=cache, + retry_count=datasource_config.retry_count, + retry_sleep=datasource_config.retry_sleep, + ) datasource = TzktDatasource( url=datasource_config.url, - cache=self._config.cache_enabled, + proxy=proxy, ) elif isinstance(datasource_config, BcdDatasourceConfig): + proxy = DatasourceRequestProxy( + cache=cache, + retry_count=datasource_config.retry_count, + retry_sleep=datasource_config.retry_sleep, + ) datasource = BcdDatasource( url=datasource_config.url, network=datasource_config.network, - cache=self._config.cache_enabled, + proxy=proxy, ) elif isinstance(datasource_config, CoinbaseDatasourceConfig): + proxy = DatasourceRequestProxy( + cache=cache, + retry_count=datasource_config.retry_count, + retry_sleep=datasource_config.retry_sleep, + ratelimiter=AsyncLimiter(max_rate=10, time_period=1), + ) datasource = CoinbaseDatasource( - cache=self._config.cache_enabled, + proxy=proxy, ) else: raise NotImplementedError