Skip to content

Commit

Permalink
various fixes and todos
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Dec 26, 2024
1 parent 1e239ce commit 7a2bce8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 159 deletions.
17 changes: 17 additions & 0 deletions src/dipdup/datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import TypeVar
from uuid import uuid4

from pysignalr.exceptions import ConnectionError
from pysignalr.messages import CompletionMessage

from dipdup.config import DatasourceConfig
Expand Down Expand Up @@ -160,6 +161,22 @@ def _get_ws_client(self) -> WebsocketTransport:

return self._ws_client

async def _ws_loop(self) -> None:
self._logger.info('Establishing realtime connection')
client = self._get_ws_client()
retry_sleep = self._http_config.retry_sleep

for _ in range(1, self._http_config.retry_count + 1):
try:
await client.run()
except ConnectionError as e:
self._logger.error('Websocket connection error: %s', e)
await self.emit_disconnected()
await asyncio.sleep(retry_sleep)
retry_sleep *= self._http_config.retry_multiplier

raise DatasourceError('Websocket connection failed', self.name)


# FIXME: Not necessary a WS datasource
class JsonRpcDatasource(WebsocketDatasource[DatasourceConfigT]):
Expand Down
38 changes: 38 additions & 0 deletions src/dipdup/datasources/_aiosubstrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from collections.abc import Callable
from typing import Any

from aiosubstrate import SubstrateInterface

from dipdup.datasources import JsonRpcDatasource


class SubstrateInterfaceProxy(SubstrateInterface):
def __init__(self, datasource: JsonRpcDatasource[Any]) -> None:
super().__init__(datasource.url) # type: ignore[no-untyped-call]
self._datasource = datasource

async def http_request(
self,
method: str,
params: list[Any],
) -> Any:
return await self._datasource._jsonrpc_request(
method=method,
params=params,
raw=True,
ws=False,
)

async def websocket_request(
self,
method: str,
params: Any,
result_handler: Callable[..., Any] | None = None,
) -> Any:
assert not result_handler
return await self._datasource._jsonrpc_request(
method=method,
params=params,
raw=True,
ws=True,
)
19 changes: 0 additions & 19 deletions src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
from typing import TYPE_CHECKING
from typing import Any

import pysignalr
import pysignalr.exceptions

from dipdup.config import HttpConfig
from dipdup.config.evm_node import EvmNodeDatasourceConfig
from dipdup.datasources import JsonRpcDatasource
Expand Down Expand Up @@ -164,22 +161,6 @@ async def _emitter_loop(self) -> None:

del self._level_data[head.hash]

async def _ws_loop(self) -> None:
self._logger.info('Establishing realtime connection')
client = self._get_ws_client()
retry_sleep = self._http_config.retry_sleep

for _ in range(1, self._http_config.retry_count + 1):
try:
await client.run()
except pysignalr.exceptions.ConnectionError as e:
self._logger.error('Websocket connection error: %s', e)
await self.emit_disconnected()
await asyncio.sleep(retry_sleep)
retry_sleep *= self._http_config.retry_multiplier

raise DatasourceError('Websocket connection failed', self.name)

@property
def ws_available(self) -> bool:
return self._config.ws_url is not None
Expand Down
43 changes: 12 additions & 31 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from copy import copy
from dataclasses import dataclass
from dataclasses import field
from functools import partial
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

import orjson
import pysignalr.exceptions

from dipdup.config import HttpConfig
from dipdup.config.substrate_node import SubstrateNodeDatasourceConfig
Expand All @@ -28,7 +28,9 @@
from dipdup.models.substrate_node import SubstrateNodeSubscription
from dipdup.pysignalr import Message
from dipdup.pysignalr import WebsocketMessage
from dipdup.utils import Watchdog

if TYPE_CHECKING:
from aiosubstrate.base import SubstrateInterface

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,20 +99,12 @@ class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]):
)

def __init__(self, config: SubstrateNodeDatasourceConfig) -> None:
from aiosubstrate.base import SubstrateInterface

# NOTE: Use our aiohttp session and limiters
SubstrateInterface.http_request = partial(self._jsonrpc_request, raw=True) # type: ignore[method-assign]

super().__init__(config)
self._pending_subscription: SubstrateNodeSubscription | None = None
self._subscription_ids: dict[str, SubstrateNodeSubscription] = {}
self._interface = SubstrateInterface(config.url) # type: ignore[no-untyped-call]

self._emitter_queue: Queue[SubscriptionMessage] = Queue()

self._watchdog: Watchdog = Watchdog(self._http_config.connection_timeout)

self._on_head_callbacks: set[HeadCallback] = set()
self._on_event_callbacks: set[EventCallback] = set()

Expand All @@ -119,7 +113,6 @@ async def run(self) -> None:
await asyncio.gather(
self._ws_loop(),
self._emitter_loop(),
# self._watchdog.run(),
)
else:
while True:
Expand All @@ -135,22 +128,11 @@ async def initialize(self) -> None:
await self._interface.init_props() # type: ignore[no-untyped-call]
self._interface.reload_type_registry()

async def _ws_loop(self) -> None:
# TODO: probably add to inheritance WebsocketSubscriptionDatasource, and move this method there
self._logger.info('Establishing realtime connection')
client = self._get_ws_client()
retry_sleep = self._http_config.retry_sleep

for _ in range(1, self._http_config.retry_count + 1):
try:
await client.run()
except pysignalr.exceptions.ConnectionError as e:
self._logger.error('Websocket connection error: %s', e)
await self.emit_disconnected()
await asyncio.sleep(retry_sleep)
retry_sleep *= self._http_config.retry_multiplier
@cached_property
def _interface(self) -> 'SubstrateInterface':
from dipdup.datasources._aiosubstrate import SubstrateInterfaceProxy

raise DatasourceError('Websocket connection failed', self.name)
return SubstrateInterfaceProxy(self)

@property
def ws_available(self) -> bool:
Expand Down Expand Up @@ -224,11 +206,10 @@ async def get_block_hash(self, height: int) -> str:

async def get_block_header(self, hash: str) -> _BlockHeader:
response = await self._jsonrpc_request('chain_getHeader', [hash])
# FIXME: missing fields
return {
return { # type: ignore[typeddict-item]
**response,
'hash': hash,
'number': int(response['number'], 16),
'prev_root': response['parentHash'],
}

async def get_metadata_header(self, height: int) -> MetadataHeader:
Expand Down Expand Up @@ -342,7 +323,7 @@ async def _emitter_loop(self) -> None:
self._logger.info('New head: %s', level)
await self.emit_head(level_data.head)

# NOTE: subscribing to finalized head, no rollback required
# NOTE: Subscribing to finalized head, no rollback handling required

if level_data.fetch_events:
block_hash = await self.get_block_hash(level)
Expand Down
45 changes: 8 additions & 37 deletions src/dipdup/datasources/tezos_tzkt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from asyncio import Event
from collections import defaultdict
Expand All @@ -13,10 +12,8 @@
from functools import partial
from typing import Any
from typing import NamedTuple
from typing import NoReturn
from typing import cast

import pysignalr.exceptions
from pysignalr.client import SignalRClient
from pysignalr.messages import CompletionMessage

Expand All @@ -28,7 +25,7 @@
from dipdup.config.tezos_tzkt import TZKT_API_URLS
from dipdup.config.tezos_tzkt import TezosTzktDatasourceConfig
from dipdup.datasources import Datasource
from dipdup.datasources import IndexDatasource
from dipdup.datasources import WebsocketDatasource
from dipdup.exceptions import DatasourceError
from dipdup.exceptions import FrameworkException
from dipdup.models import Head
Expand All @@ -46,6 +43,7 @@
from dipdup.models.tezos_tzkt import HeadSubscription
from dipdup.models.tezos_tzkt import TezosTzktMessageType
from dipdup.models.tezos_tzkt import TezosTzktSubscription
from dipdup.pysignalr import WebsocketTransport
from dipdup.utils import split_by_chunks

ORIGINATION_REQUEST_LIMIT = 100
Expand Down Expand Up @@ -231,7 +229,7 @@ def get_address(self, code_hash: int, type_hash: int) -> str:
return self._hashes_to_address[(code_hash, type_hash)]


class TezosTzktDatasource(IndexDatasource[TezosTzktDatasourceConfig]):
class TezosTzktDatasource(WebsocketDatasource[TezosTzktDatasourceConfig]):
_default_http_config = HttpConfig(
retry_sleep=1,
retry_multiplier=1.1,
Expand Down Expand Up @@ -281,22 +279,8 @@ async def __aenter__(self) -> None:
def request_limit(self) -> int:
return self._http_config.batch_size

# FIXME: Join retry logic with other index datasources
async def run(self) -> None:
self._logger.info('Establishing realtime connection')
signalr_client = self._get_signalr_client()
retry_sleep = self._http_config.retry_sleep

for _ in range(1, self._http_config.retry_count + 1):
try:
await signalr_client.run()
except pysignalr.exceptions.ConnectionError as e:
self._logger.error('Websocket connection error: %s', e)
await self.emit_disconnected()
await asyncio.sleep(retry_sleep)
retry_sleep *= self._http_config.retry_multiplier

raise DatasourceError('Websocket connection failed', self.name)
await self._ws_loop()

async def initialize(self) -> None:
head_block = await self.get_head_block()
Expand Down Expand Up @@ -1201,6 +1185,9 @@ async def _iter_batches(
else:
offset += self.request_limit

def _get_ws_client(self) -> WebsocketTransport:
return self._get_signalr_client()._transport # type: ignore[return-value]

def _get_signalr_client(self) -> SignalRClient:
"""Create SignalR client, register message callbacks"""
if self._signalr_client:
Expand Down Expand Up @@ -1235,23 +1222,7 @@ async def _send(
client = self._get_signalr_client()
await client.send(method, arguments, on_invocation) # type: ignore[arg-type]

async def _on_connected(self) -> None:
self._logger.info('Realtime connection established')
# NOTE: Subscribing here will block WebSocket loop, don't do it.
await self.emit_connected()

async def _on_disconnected(self) -> None:
self._logger.info('Realtime connection lost, resetting subscriptions')
self._subscriptions.reset()
# NOTE: Just in case
self._contract_hashes.reset()
await self.emit_disconnected()

async def _on_error(self, message: CompletionMessage) -> NoReturn:
"""Raise exception from WS server's error message"""
raise DatasourceError(datasource=self.name, msg=cast(str, message.error))

async def _on_message(self, type_: TezosTzktMessageType, message: list[dict[str, Any]]) -> None:
async def _on_message(self, type_: TezosTzktMessageType, message: list[dict[str, Any]]) -> None: # type: ignore[override]
"""Parse message received from Websocket, ensure it's correct in the current context and yield data."""
# NOTE: Parse messages and either buffer or yield data
for item in message:
Expand Down
45 changes: 21 additions & 24 deletions src/dipdup/models/substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@
from dipdup.runtimes import SubstrateRuntime


class _BlockHeaderExtra(TypedDict):
class _BlockHeader(TypedDict):
hash: str
number: int
parentHash: str
stateRoot: str
extrinsicsRoot: str
digest: dict[str, dict[str, list[str]]]


class _BlockHeaderExtra(TypedDict):
hash: str
number: int
parentHash: str
stateRoot: str
extrinsicsRoot: str
digest: str
digest: dict[str, dict[str, list[str]]]

specName: str
specVersion: int
implName: str
Expand All @@ -35,12 +45,6 @@ class _SubstrateSubsquidEventResponse(TypedDict):
header: _BlockHeaderExtra


class _BlockHeader(TypedDict):
hash: str
number: int
prev_root: str


class _SubstrateNodeEventResponse(TypedDict):
name: str
index: int
Expand All @@ -55,7 +59,6 @@ class SubstrateEventData(HasLevel):
index: int
extrinsic_index: int
call_address: list[str] | None
# we receive decoded args from node datasource and encoded from subsquid datasource
args: list[Any] | None = None
decoded_args: dict[str, Any] | None = None
header: _BlockHeader
Expand Down Expand Up @@ -84,11 +87,7 @@ def from_subsquid(cls, event_dict: _SubstrateSubsquidEventResponse) -> Self:
call_address=event_dict['callAddress'],
args=event_dict['args'],
decoded_args=None,
header={
'hash': event_dict['header']['hash'],
'number': event_dict['header']['number'],
'prev_root': event_dict['header']['parentHash'],
},
header=event_dict['header'],
header_extra=event_dict['header'],
)

Expand All @@ -112,20 +111,18 @@ class SubstrateEvent(Generic[PayloadT]):
# TODO: Use lazy decoding in other models with typed payload
@cached_property
def payload(self) -> PayloadT:
# NOTE: from node datasource
# NOTE: We receive decoded args from node datasource and encoded from subsquid datasource
if self.data.decoded_args is not None:
return cast(PayloadT, self.data.decoded_args)

# NOTE: from subsquid datasource
assert self.data.args is not None and self.data.header_extra is not None
return cast(
PayloadT,
self.runtime.decode_event_args(
payload = self.data.decoded_args
elif self.data.args is not None and self.data.header_extra is not None:
payload = self.runtime.decode_event_args(
name=self.name,
args=self.data.args,
spec_version=str(self.data.header_extra['specVersion']),
),
)
)
else:
raise NotImplementedError
return cast(PayloadT, payload)

@property
def level(self) -> int:
Expand Down
Loading

0 comments on commit 7a2bce8

Please sign in to comment.