Skip to content

Commit

Permalink
subscriptions demo
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizard1209 committed Dec 8, 2024
1 parent 3fe0b21 commit bfeb187
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 927 deletions.
1,178 changes: 275 additions & 903 deletions pdm.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ dependencies = [
"python-dotenv~=1.0",
"python-json-logger~=2.0",
"ruamel.yaml~=0.18.6",
# "substrate-interface",
"substrate-interface @ git+https://github.com/dipdup-io/py-substrate-interface-async.git@dipdup",
"sentry-sdk~=2.16",
"sqlparse~=0.5",
"starknet-py==0.24.0",
Expand All @@ -81,6 +79,7 @@ dependencies = [
"tortoise-orm==0.21.7",
"uvloop~=0.20",
"web3~=7.2",
"aiosubstrate @ git+https://github.com/dipdup-io/aiosubstrate.git",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -179,9 +178,6 @@ docs = [
perf = [
"scalene",
]
dev = [
"-e file:///${PROJECT_ROOT}/../py-substrate-interface#egg=substrate-interface",
]

[tool.pdm.build]
includes = ["src/dipdup"]
Expand Down
11 changes: 3 additions & 8 deletions src/dipdup/config/substrate_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
from dipdup.config.substrate import SubstrateDatasourceConfigU
from dipdup.config.substrate import SubstrateIndexConfig
from dipdup.config.substrate import SubstrateRuntimeConfig
from dipdup.models.substrate_node import SubstrateNodeHeadSubscription
from dipdup.utils import pascal_to_snake
from dipdup.utils import snake_to_pascal

if TYPE_CHECKING:
from collections.abc import Iterator

from dipdup.subscriptions import Subscription


@dataclass(frozen=True)
class DummySubscription(Subscription):
pass
from dipdup.subscriptions import Subscription


@dataclass(config=ConfigDict(extra='forbid'), kw_only=True)
Expand Down Expand Up @@ -75,5 +71,4 @@ class SubstrateEventsIndexConfig(SubstrateIndexConfig):
last_level: int = 0

def get_subscriptions(self) -> set[Subscription]:
# FIXME: or get_sync_level fails
return {DummySubscription()}
return {SubstrateNodeHeadSubscription(fetch_events=True)}
106 changes: 101 additions & 5 deletions src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
from asyncio import Queue
from collections.abc import Awaitable
from collections.abc import Callable
import logging
import math
from copy import copy
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any

import orjson
import pysignalr.exceptions
Expand All @@ -13,14 +17,23 @@
from dipdup.config.substrate import SubstrateDatasourceConfigU
from dipdup.datasources import JsonRpcDatasource
from dipdup.exceptions import DatasourceError
from dipdup.models.substrate import BlockHeader
from dipdup.exceptions import FrameworkException
from dipdup.models.substrate import BlockHeader, SubstrateEventData
from dipdup.models.substrate import SubstrateEventDataDict
from dipdup.models.substrate_node import SubstrateNodeHeadSubscription
from dipdup.models.substrate_node import SubstrateNodeSubscription
from dipdup.pysignalr import Message
from dipdup.pysignalr import WebsocketMessage
from dipdup.utils import Watchdog

_logger = logging.getLogger(__name__)


# TODO: think about return type of callback, for now i'm not sure where data should be ingested after the head update
HeadCallback = Callable[['SubstrateNodeDatasource', dict], Awaitable[None]]
EventCallback = Callable[['SubstrateNodeDatasource', tuple[SubstrateEventData, ...]], Awaitable[None]]


@dataclass
class MetadataVersion:
spec_name: str
Expand Down Expand Up @@ -76,8 +89,17 @@ class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateDatasourceConfigU]):
def __init__(self, config: SubstrateDatasourceConfigU) -> None:
from substrateinterface.base import SubstrateInterface
super().__init__(config)
self._pending_subscription = None
self._subscription_ids: dict[str, SubstrateNodeSubscription] = {}
self._interface = SubstrateInterface(config.url)

self._emitter_queue: Queue[dict] = Queue()

self._watchdog: Watchdog = Watchdog(self._http_config.connection_timeout)

self._on_head_callbacks: set[HeadCallback] = set()
self._on_event_callbacks: set[EventCallback] = set()

async def run(self) -> None:
if self.realtime:
await asyncio.gather(
Expand All @@ -99,7 +121,6 @@ async def initialize(self) -> None:
await self._interface.init_props()
self._interface.reload_type_registry()


async def _ws_loop(self) -> None:
# TODO: probably add to inheritance WebsocketSubscriptionDatasource, and move this method there
self._logger.info('Establishing realtime connection')
Expand All @@ -117,6 +138,10 @@ async def _ws_loop(self) -> None:

raise DatasourceError('Websocket connection failed', self.name)

@property
def realtime(self) -> bool:
return self._config.ws_url is not None

async def subscribe(self) -> None:
if not self.realtime:
return
Expand All @@ -131,8 +156,50 @@ async def subscribe(self) -> None:
if isinstance(subscription, SubstrateNodeSubscription):
await self._subscribe(subscription)

# TODO: fix typing
async def emit_head(self, head: dict) -> None:
for fn in self._on_head_callbacks:
await fn(self, head)

async def emit_events(self, events: tuple[SubstrateEventData, ...]) -> None:
for fn in self._on_event_callbacks:
await fn(self, events)

def call_on_head(self, fn: HeadCallback) -> None:
self._on_head_callbacks.add(fn)

def call_on_events(self, fn: EventCallback) -> None:
self._on_event_callbacks.add(fn)

async def _on_message(self, message: Message) -> None:
raise NotImplementedError
# TODO: since only head subscription available we need to load target objects(i.e. events) separately
# to schedule those requests we need to get information about index type from subscription

if not isinstance(message, WebsocketMessage):
raise FrameworkException(f'Unknown message type: {type(message)}')

data = message.data

if 'id' in data:

# NOTE: Save subscription id
if self._pending_subscription:
self._subscription_ids[data['result']] = self._pending_subscription

# NOTE: Possibly unreliable logic from evm_node, and possibly too time consuming for message handling
level = await self.get_head_level()
self._subscriptions.set_sync_level(self._pending_subscription, level)

# NOTE: Set None to identify possible subscriptions conflicts
self._pending_subscription = None
elif 'method' in data and data['method'].startswith('chain_'):
subscription_id = data['params']['subscription']
if subscription_id not in self._subscription_ids:
raise FrameworkException(f'{self.name}: Unknown subscription ID: {subscription_id}')
subscription = self._subscription_ids[subscription_id]
await self._handle_subscription(subscription, data['params']['result'])
else:
raise DatasourceError(f'Unknown message: {data}', self.name)

async def get_head_level(self) -> int:
head = await self._jsonrpc_request('chain_getFinalizedHead', [])
Expand Down Expand Up @@ -233,9 +300,38 @@ async def get_dev_metadata_version(self) -> MetadataVersion | None:
if genesis == last:
return genesis
return None

async def _subscribe(self, subscription: SubstrateNodeSubscription) -> None:
... # TODO: make subscription request to node using subscription.method
self._logger.debug('Subscribing to %s', subscription)
self._pending_subscription = subscription
response = await self._jsonrpc_request(subscription.method, params=[], ws=True)
raise RuntimeError('Subscription answered with %s', response)

async def _handle_subscription(self, subscription: SubstrateNodeSubscription, data: Any) -> None:
if isinstance(subscription, SubstrateNodeHeadSubscription):
# TODO: reimplement for universal message instead of head
self._emitter_queue.put_nowait(data)
else:
raise NotImplementedError

async def _emitter_loop(self) -> None:
while True:
level_data = await self._emitter_queue.get()
# NOTE: for now level_data is always head dict

# TODO: emit head
# self._logger.info('New head: %s -> %s', known_level, head.level)
# await self.emit_head(head)

# NOTE: subscribing to finalized head, no rollback required

# TODO: when there will be indexes other then events made it subscription conditional
# block_hash = await self.get_block_hash(level)
# event_dicts = await self.get_events(block_hash)
# block_header = await self.get_block_header(block_hash)
# events = tuple(SubstrateEventData(**event_dict, header=block_header)
# for event_dict in event_dicts)
# await self.emit_events(events)


# FIXME: Not used, should be a subscan replacement
Expand Down
16 changes: 16 additions & 0 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
from dipdup.datasources import IndexDatasource
from dipdup.datasources import create_datasource
from dipdup.datasources.evm_node import EvmNodeDatasource
from dipdup.datasources.substrate_node import SubstrateNodeDatasource
from dipdup.datasources.tezos_tzkt import TezosTzktDatasource
from dipdup.datasources.tezos_tzkt import late_tzkt_initialization
from dipdup.exceptions import ConfigInitializationException
from dipdup.exceptions import FrameworkException
from dipdup.hasura import HasuraGateway
from dipdup.indexes.evm_events.index import EvmEventsIndex
from dipdup.indexes.evm_transactions.index import EvmTransactionsIndex
from dipdup.indexes.substrate_events.index import SubstrateEventsIndex
from dipdup.indexes.tezos_big_maps.index import TezosBigMapsIndex
from dipdup.indexes.tezos_events.index import TezosEventsIndex
from dipdup.indexes.tezos_head.index import TezosHeadIndex
Expand Down Expand Up @@ -450,6 +452,9 @@ async def _subscribe_to_datasource_events(self) -> None:
datasource.call_on_events(self._on_evm_node_events)
datasource.call_on_transactions(self._on_evm_node_transactions)
datasource.call_on_syncing(self._on_evm_node_syncing)
elif isinstance(datasource, SubstrateNodeDatasource):
datasource.call_on_head(self._on_substrate_head)
datasource.call_on_events(self._on_substrate_events)

async def _on_tzkt_head(self, datasource: TezosTzktDatasource, head: TezosHeadBlockData) -> None:
# NOTE: Do not await query results, it may block Websocket loop. We do not use Head anyway.
Expand Down Expand Up @@ -546,6 +551,17 @@ async def _on_tzkt_events(self, datasource: TezosTzktDatasource, events: tuple[T
if isinstance(index, TezosEventsIndex) and datasource in index.datasources:
index.push_realtime_message(events)

# TODO: fix data typing
async def _on_substrate_head(self, datasource: SubstrateNodeDatasource, head: dict) -> None:
# TODO: any head updates here?
pass

# TODO: fix data typing
async def _on_substrate_events(self, datasource: SubstrateNodeDatasource, events: tuple[dict, ...]) -> None:
for index in self._indexes.values():
if isinstance(index, SubstrateEventsIndex) and datasource in index.datasources:
index.push_realtime_message(events)

async def _on_rollback(
self,
datasource: IndexDatasource[Any],
Expand Down
9 changes: 3 additions & 6 deletions src/dipdup/models/substrate_node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC
from typing import Any
from typing import Literal

from pydantic.dataclasses import dataclass
Expand All @@ -8,13 +7,11 @@


class SubstrateNodeSubscription(ABC, Subscription):
name: str

def get_params(self) -> list[Any]:
return [self.name]
...


@dataclass(frozen=True)
class SubstrateNodeHeadSubscription(SubstrateNodeSubscription):
name: Literal['finalisedHeads'] = 'finalisedHeads'
method: Literal['chain_subscribeFinalisedHeads'] = 'chain_subscribeFinalisedHeads'
# NOTE: used to determine which objects index require, since we can only subscribe to head
fetch_events: bool = False

0 comments on commit bfeb187

Please sign in to comment.