Skip to content

Commit

Permalink
Last minute bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Jan 17, 2024
1 parent 4755172 commit 30459f2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic
### Fixed

- abi.etherscan: Fixed handling "rate limit reached" errors.
- cli: Fixed setting logger levels based on config and env variables.
- codegen: Don't create intermediate file `events.json` in ABI directory.
- evm.subsquid: When request to worker fails, ask router for another one instead of retrying the same worker.
- http: Fixed incorrect number of retries performed on failed requests.

## [7.2.2] - 2023-12-27

Expand Down
51 changes: 39 additions & 12 deletions src/dipdup/datasources/evm_subsquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ async def run(self) -> None:
raise FrameworkException('Subsquid worker datasource should not be run')

async def query(self, query: Query) -> list[dict[str, Any]]:
async with self:
self._logger.debug('Worker query: %s', query)
response = await self.request(
'post',
url='',
json=query,
)
return cast(list[dict[str, Any]], response)
self._logger.debug('Worker query: %s', query)
response = await self.request(
'post',
url='',
json=query,
)
return cast(list[dict[str, Any]], response)


class SubsquidDatasource(IndexDatasource[SubsquidDatasourceConfig]):
Expand All @@ -113,6 +112,29 @@ async def run(self) -> None:
async def subscribe(self) -> None:
pass

# FIXME: Heavily copy-pasted from `HTTPGateway._retry_request`
async def query_worker(self, query: Query, current_level: int) -> list[dict[str, Any]]:
retry_sleep = self._http_config.retry_sleep
attempt = 1
last_attempt = self._http_config.retry_count + 1

while True:
try:
# NOTE: Request a fresh worker after each failed attempt
worker_datasource = await self._get_worker(current_level)
async with worker_datasource:
return await worker_datasource.query(query)
except DatasourceError as e:
self._logger.warning('Worker query attempt %s/%s failed: %s', attempt, last_attempt, e)
if attempt == last_attempt:
raise e

self._logger.info('Waiting %s seconds before retry', retry_sleep)
await asyncio.sleep(retry_sleep)

attempt += 1
retry_sleep *= self._http_config.retry_multiplier

async def iter_event_logs(
self,
topics: tuple[tuple[str | None, str], ...],
Expand Down Expand Up @@ -140,8 +162,7 @@ async def iter_event_logs(
'fromBlock': current_level,
'toBlock': last_level,
}
worker_datasource = await self._get_worker(current_level)
response = await worker_datasource.query(query)
response = await self.query_worker(query, current_level)

for level_item in response:
level = level_item['header']['number']
Expand Down Expand Up @@ -169,8 +190,7 @@ async def iter_transactions(
'toBlock': last_level,
'transactions': list(filters),
}
worker_datasource = await self._get_worker(current_level)
response = await worker_datasource.query(query)
response = await self.query_worker(query, current_level)

for level_item in response:
level = level_item['header']['number']
Expand Down Expand Up @@ -203,6 +223,13 @@ async def _get_worker(self, level: int) -> _SubsquidWorker:
f'{self._config.url}/{level}/worker',
)
).decode()

worker_config = copy(self._config)
worker_config.url = worker_url
if not worker_config.http:
worker_config.http = self._default_http_config

# NOTE: Fail immediately; retries are handled one level up
worker_config.http.retry_count = 0

return _SubsquidWorker(worker_config)
16 changes: 7 additions & 9 deletions src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import hashlib
import logging
import platform
import sys
import time
from collections.abc import Mapping
from contextlib import AbstractAsyncContextManager
Expand Down Expand Up @@ -148,12 +147,12 @@ async def _retry_request(
attempt = 1
retry_sleep = self._config.retry_sleep
retry_count = 0 if env.TEST else self._config.retry_count
retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count)
last_attempt = retry_count + 1

Metrics.set_http_errors_in_row(self._url, 0)

while True:
self._logger.debug('HTTP request attempt %s/%s', attempt, retry_count_str)
self._logger.debug('HTTP request attempt %s/%s', attempt, last_attempt)
try:
return await self._request(
method=method,
Expand All @@ -162,9 +161,6 @@ async def _retry_request(
**kwargs,
)
except safe_exceptions as e:
if self._config.retry_count and attempt - 1 == self._config.retry_count:
raise e

ratelimit_sleep: float | None = None
if isinstance(e, aiohttp.ClientResponseError):
Metrics.set_http_error(self._url, e.status)
Expand All @@ -178,12 +174,14 @@ async def _retry_request(
else:
Metrics.set_http_error(self._url, 0)

self._logger.warning('HTTP request attempt %s/%s failed: %s', attempt, retry_count_str, e)
self._logger.info('Waiting %s seconds before retry', ratelimit_sleep or retry_sleep)

self._logger.warning('HTTP request attempt %s/%s failed: %s', attempt, last_attempt, e)
Metrics.set_http_errors_in_row(self._url, attempt)
if attempt == last_attempt:
raise e

self._logger.info('Waiting %s seconds before retry', ratelimit_sleep or retry_sleep)
await asyncio.sleep(ratelimit_sleep or retry_sleep)

attempt += 1
if not ratelimit_sleep:
retry_sleep *= self._config.retry_multiplier
Expand Down
3 changes: 1 addition & 2 deletions src/dipdup/indexes/evm_subsquid_transactions/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
]


# NOTE: Completely disable padding validation. Too many false positives.
# NOTE: Completely disable padding validation. If data is in Subsquid, node was ok with it.
eth_abi.decoding.ByteStringDecoder.validate_padding_bytes = lambda *a, **kw: None # type: ignore[method-assign]
eth_abi.decoding.FixedByteSizeDecoder.validate_padding_bytes = lambda *a, **kw: None # type: ignore[method-assign]
eth_abi.decoding.SignedFixedDecoder.validate_padding_bytes = lambda *a, **kw: None # type: ignore[method-assign]
eth_abi.decoding.SignedIntegerDecoder.validate_padding_bytes = lambda *a, **kw: None # type: ignore[method-assign]
eth_abi.decoding.SingleDecoder.validate_padding_bytes = lambda *a, **kw: None # type: ignore[method-assign]



def prepare_transaction_handler_args(
package: DipDupPackage,
handler_config: SubsquidTransactionsHandlerConfig,
Expand Down

0 comments on commit 30459f2

Please sign in to comment.