Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with determining the last level when syncing with node #856

Merged
merged 4 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ The format is based on [Keep a Changelog], and this project adheres to [Semantic

## [Unreleased]

### Added

- env: Added `DIPDUP_DEBUG` environment variable to enable debug logging.

### Fixed

- demos: Fixed decimal overflow in `demo_uniswap` project.
- evm.node: Fixed incorrect log request parameters.
- evm.subsquid.events: Fixed issue with determining the last level when syncing with node.

## [7.0.0] - 2023-09-25

Expand Down
4 changes: 3 additions & 1 deletion src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncclick as click

from dipdup import __version__
from dipdup import env
from dipdup.install import EPILOG
from dipdup.install import WELCOME_ASCII
from dipdup.performance import metrics
Expand Down Expand Up @@ -216,6 +217,8 @@ async def cli(ctx: click.Context, config: list[str], env_file: list[str]) -> Non
from dipdup.sys import set_up_logging

set_up_logging()
if env.DEBUG:
logging.getLogger('dipdup').setLevel(logging.DEBUG)

env_file_paths = [Path(file) for file in env_file]
config_paths = [Path(file) for file in config]
Expand All @@ -232,7 +235,6 @@ async def cli(ctx: click.Context, config: list[str], env_file: list[str]) -> Non
logging.getLogger('dipdup').setLevel(logging.INFO)
return

from dipdup import env
from dipdup.config import DipDupConfig
from dipdup.exceptions import InitializationRequiredError
from dipdup.package import DipDupPackage
Expand Down
2 changes: 2 additions & 0 deletions src/dipdup/config/evm_subsquid_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SubsquidEventsIndexConfig(IndexConfig):
:param datasource: Subsquid datasource
:param handlers: Event handlers
:param abi: One or more `evm.abi` datasource(s) for the same network
:param node_only: Don't use Subsquid Archives API (dev only)
:param first_level: Level to start indexing from
:param last_level: Level to stop indexing and disable this index
"""
Expand All @@ -66,6 +67,7 @@ class SubsquidEventsIndexConfig(IndexConfig):
datasource: SubsquidDatasourceConfig
handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple)
abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None
node_only: bool = False

first_level: int = 0
last_level: int = 0
Expand Down
2 changes: 1 addition & 1 deletion src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def _on_message(self, message: Message) -> None:
if subscription_id not in self._subscription_ids:
raise FrameworkException(f'{self.name}: Unknown subscription ID: {subscription_id}')
subscription = self._subscription_ids[subscription_id]
self._logger.debug('Received subscription for channel %s', subscription_id)
self._logger.info('Received a message from channel %s', subscription_id)
await self._handle_subscription(subscription, data['params']['result'])
else:
raise DatasourceError(f'Unknown method: {data["method"]}', self.name)
Expand Down
35 changes: 24 additions & 11 deletions src/dipdup/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,30 @@ def get_path(key: str) -> Path | None:
def set_test() -> None:
global TEST, REPLAY_PATH
TEST = True
REPLAY_PATH = str(Path(__file__).parent.parent.parent / 'tests' / 'replays')
env['DIPDUP_REPLAY_PATH'] = REPLAY_PATH
REPLAY_PATH = Path(__file__).parent.parent.parent / 'tests' / 'replays'


if get('CI') == 'true':
env['DIPDUP_CI'] = '1'
if platform.system() == 'Linux' and Path('/.dockerenv').exists():
env['DIPDUP_DOCKER'] = '1'
CI: bool
DEBUG: bool
DOCKER: bool
NEXT: bool
REPLAY_PATH: Path | None
TEST: bool

CI = get_bool('DIPDUP_CI')
DOCKER = get_bool('DIPDUP_DOCKER')
NEXT = get_bool('DIPDUP_NEXT')
REPLAY_PATH = get_path('DIPDUP_REPLAY_PATH')
TEST = get_bool('DIPDUP_TEST')

def read() -> None:
global CI, DEBUG, DOCKER, NEXT, REPLAY_PATH, TEST
CI = get_bool('DIPDUP_CI')
DEBUG = get_bool('DIPDUP_DEBUG')
DOCKER = get_bool('DIPDUP_DOCKER')
NEXT = get_bool('DIPDUP_NEXT')
REPLAY_PATH = get_path('DIPDUP_REPLAY_PATH')
TEST = get_bool('DIPDUP_TEST')

if get('CI') == 'true':
CI = True
if platform.system() == 'Linux' and Path('/.dockerenv').exists():
DOCKER = True


read()
3 changes: 2 additions & 1 deletion src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from aiolimiter import AsyncLimiter

from dipdup import __version__
from dipdup import env
from dipdup.config import ResolvedHttpConfig
from dipdup.exceptions import FrameworkException
from dipdup.exceptions import InvalidRequestError
Expand Down Expand Up @@ -143,7 +144,7 @@ async def _retry_request(
"""Retry a request in case of failure sleeping according to config"""
attempt = 1
retry_sleep = self._config.retry_sleep
retry_count = self._config.retry_count
retry_count = 0 if env.TEST else self._config.retry_count
retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count)

while True:
Expand Down
15 changes: 6 additions & 9 deletions src/dipdup/indexes/evm_subsquid_events/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,6 @@ async def _process_queue(self) -> None:
break

for message_level, level_logs in logs_by_level.items():
# NOTE: If it's not a next block - resync with Subsquid
if message_level != self.state.level + 1:
self._logger.info('Not enough messages in queue; resyncing to %s', message_level)
self._queue.clear()
self.datasource.set_sync_level(None, message_level)
return

await self._process_level_events(tuple(level_logs), self.topics, message_level)

def get_sync_level(self) -> int:
Expand Down Expand Up @@ -147,16 +140,20 @@ async def _synchronize(self, sync_level: int) -> None:
self._logger.info('Subsquid is %s levels behind; %s available', subsquid_lag, subsquid_available)
if subsquid_available < NODE_SYNC_LIMIT:
use_node = True
elif self._config.node_only:
self._logger.debug('Using node anyway')
use_node = True

# NOTE: Fetch last blocks from node if there are not enough realtime messages in queue
if use_node and self.node_datasources:
sync_level = node_sync_level
sync_level = min(sync_level, node_sync_level)
self._logger.debug('Using node datasource; sync level: %s', sync_level)
topics = set()
for handler in self._config.handlers:
typename = handler.contract.module_name
topics.add(self.topics[typename][handler.name])
# FIXME: This is terribly inefficient (but okay for the last mile); see advanced example in web3.py docs.
for level in range(first_level, sync_level):
for level in range(first_level, sync_level + 1):
# NOTE: Get random one every time
level_logs = await self.random_node.get_logs(
{
Expand Down
36 changes: 36 additions & 0 deletions tests/configs/demo_evm_events_node.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
spec_version: 2.0
package: demo_evm_events

datasources:
ethscan:
kind: abi.etherscan

mainnet_node:
kind: evm.node
url: https://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_KEY:-''}
ws_url: wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_KEY:-''}

mainnet_subsquid:
kind: evm.subsquid
url: ${ARCHIVE_URL:-https://v2.archive.subsquid.io/network/ethereum-mainnet}
node: mainnet_node
http:
replay_path: ${DIPDUP_REPLAY_PATH:-}

contracts:
eth_usdt:
kind: evm
address: 0xdac17f958d2ee523a2206206994597c13d831ec7
typename: eth_usdt

indexes:
eth_usdt_events:
kind: evm.subsquid.events
datasource: mainnet_subsquid
handlers:
- callback: on_transfer
contract: eth_usdt
name: Transfer
first_level: 18077421
last_level: 18077421
node_only: true
1 change: 1 addition & 0 deletions tests/test_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ async def assert_run_dao() -> None:
('demo_raw.yml', 'demo_raw', 'init', partial(assert_init, 'demo_raw')),
('demo_evm_events.yml', 'demo_evm_events', 'run', assert_run_evm_events),
('demo_evm_events.yml', 'demo_evm_events', 'init', partial(assert_init, 'demo_evm_events')),
('demo_evm_events_node.yml', 'demo_evm_events', 'run', assert_run_evm_events),
)


Expand Down