From 64fb4544de8d62c9e92f80a00d792f43922972c6 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Tue, 10 Aug 2021 10:08:28 +0300 Subject: [PATCH] Index state management and Hasura bugfixes (#117) --- src/dipdup/datasources/datasource.py | 2 +- src/dipdup/datasources/tzkt/datasource.py | 98 ++++++++--------------- src/dipdup/dipdup.py | 22 ++--- src/dipdup/hasura.py | 3 +- src/dipdup/index.py | 95 +++++++++++++--------- tests/integration_tests/test_rollback.py | 1 + 6 files changed, 108 insertions(+), 113 deletions(-) diff --git a/src/dipdup/datasources/datasource.py b/src/dipdup/datasources/datasource.py index 6c8088dbf..f952a3e05 100644 --- a/src/dipdup/datasources/datasource.py +++ b/src/dipdup/datasources/datasource.py @@ -23,7 +23,7 @@ def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData class BigMapsCallback(Protocol): - def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]: + def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData], block: HeadBlockData) -> Awaitable[None]: ... diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 751422769..50476d6e2 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -320,7 +320,7 @@ def sync_level(self) -> Optional[int]: @property def block(self) -> HeadBlockData: if self._block is None: - raise RuntimeError('No message from `head` channel received') + raise RuntimeError('Attempt to access head block before the first message') return self._block async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]: @@ -633,87 +633,57 @@ def _default_http_config(self) -> HTTPConfig: connection_limit=25, ) - async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None: - """Parse and emit raw operations from WS""" + async def _extract_message_data(self, channel: str, message: List[Any]) -> Any: for item in message: - current_level = item['state'] + head_level = item['state'] message_type = TzktMessageType(item['type']) - self._logger.info('Got operation message, %s, level %s', message_type, current_level) + self._logger.debug('`%s` message: %s', channel, message_type.name) if message_type == TzktMessageType.STATE: - self._sync_level = current_level - self._level = current_level + if self._sync_level != head_level: + self._logger.info('Datasource level set to %s', head_level) + self._sync_level = head_level + self._level = head_level elif message_type == TzktMessageType.DATA: - self._level = current_level - operations = [] - for operation_json in item['data']: - operation = self.convert_operation(operation_json) - if operation.status != 'applied': - continue - operations.append(operation) - if operations: - self.emit_operations(operations, self.block) + self._level = head_level + yield item['data'] elif message_type == TzktMessageType.REORG: if self.level is None: raise RuntimeError - self.emit_rollback(self.level, current_level) + self.emit_rollback(self.level, head_level) else: raise NotImplementedError - async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None: - """Parse and emit raw big map diffs from WS""" - for item in message: - current_level = item['state'] - message_type = TzktMessageType(item['type']) - self._logger.info('Got big map message, %s, level %s', message_type, current_level) - - if message_type == TzktMessageType.STATE: - self._sync_level = current_level - self._level = current_level - - elif message_type == TzktMessageType.DATA: - self._level = current_level - big_maps = [] - for big_map_json in item['data']: - big_map = self.convert_big_map(big_map_json) - big_maps.append(big_map) - self.emit_big_maps(big_maps) - elif message_type == TzktMessageType.REORG: - if self.level is None: - raise RuntimeError - self.emit_rollback(self.level, current_level) + async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None: + """Parse and emit raw operations from WS""" + async for data in self._extract_message_data('operation', message): + operations = [] + for operation_json in data: + operation = self.convert_operation(operation_json) + if operation.status != 'applied': + continue + operations.append(operation) + if operations: + self.emit_operations(operations, self.block) - else: - raise NotImplementedError + async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None: + """Parse and emit raw big map diffs from WS""" + async for data in self._extract_message_data('big_map', message): + big_maps = [] + for big_map_json in data: + big_map = self.convert_big_map(big_map_json) + big_maps.append(big_map) + self.emit_big_maps(big_maps) async def _on_head_message(self, message: List[Dict[str, Any]]) -> None: - for item in message: - current_level = item['state'] - message_type = TzktMessageType(item['type']) - self._logger.info('Got block message, %s, level %s', message_type, current_level) - - if message_type == TzktMessageType.STATE: - self._sync_level = current_level - self._level = current_level - - elif message_type == TzktMessageType.DATA: - self._level = current_level - block_json = item['data'] - block = self.convert_head_block(block_json) - self._block = block - self.emit_head(block) - - elif message_type == TzktMessageType.REORG: - if self.level is None: - raise RuntimeError - self.emit_rollback(self.level, current_level) - - else: - raise NotImplementedError + async for data in self._extract_message_data('head', message): + block = self.convert_head_block(data) + self._block = block + self.emit_head(block) @classmethod def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData: diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index ae6bd0aff..522f8b6b0 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -61,7 +61,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None: datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name datasource = self._ctx.datasources[datasource_name] if not isinstance(datasource, TzktDatasource): - raise RuntimeError + raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource') operation_index = OperationIndex(self._ctx, index_config, datasource) self._indexes[index_config.name] = operation_index await datasource.add_index(index_config) @@ -70,7 +70,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None: datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name datasource = self._ctx.datasources[datasource_name] if not isinstance(datasource, TzktDatasource): - raise RuntimeError + raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource') big_map_index = BigMapIndex(self._ctx, index_config, datasource) self._indexes[index_config.name] = big_map_index await datasource.add_index(index_config) @@ -111,22 +111,23 @@ async def dispatch_operations(self, datasource: TzktDatasource, operations: List if isinstance(index, OperationIndex) and index.datasource == datasource: index.push(level, operations, block) - async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None: + async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData], block: HeadBlockData) -> None: assert len(set(op.level for op in big_maps)) == 1 level = big_maps[0].level for index in self._indexes.values(): if isinstance(index, BigMapIndex) and index.datasource == datasource: - index.push(level, big_maps) + index.push(level, big_maps, block) async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: logger = FormattedLogger(ROLLBACK_HANDLER) if from_level - to_level == 1: # NOTE: Single level rollbacks are processed at Index level. - # NOTE: Notify all indexes with rolled back datasource to skip next level and just verify it + # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block for index in self._indexes.values(): if index.datasource == datasource: # NOTE: Continue to rollback with handler if not isinstance(index, OperationIndex): + self._logger.info('Single level rollback is not supported by `%s` indexes', index._config.kind) break await index.single_level_rollback(from_level) else: @@ -218,15 +219,18 @@ async def run(self, reindex: bool, oneshot: bool) -> None: async with AsyncExitStack() as stack: worker_tasks = [] await stack.enter_async_context(tortoise_wrapper(url, models)) + + await self._initialize_database(reindex) for datasource in self._datasources.values(): await stack.enter_async_context(datasource) + + # NOTE: on_configure hook fires after database and datasources are initialized but before Hasura is + await self._on_configure() + if hasura_gateway: await stack.enter_async_context(hasura_gateway) worker_tasks.append(asyncio.create_task(hasura_gateway.configure())) - await self._initialize_database(reindex) - await self._configure() - self._logger.info('Starting datasources') datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()] @@ -255,7 +259,7 @@ async def migrate_to_v11(self) -> None: await codegen.migrate_user_handlers_to_v11() self._finish_migration('1.1') - async def _configure(self) -> None: + async def _on_configure(self) -> None: """Run user-defined initial configuration handler""" configure_fn = self._config.get_configure_fn() await configure_fn(self._ctx) diff --git a/src/dipdup/hasura.py b/src/dipdup/hasura.py index 31931680e..9bd166f82 100644 --- a/src/dipdup/hasura.py +++ b/src/dipdup/hasura.py @@ -201,7 +201,8 @@ async def _get_views(self) -> List[str]: row[0] for row in ( await get_connection(None).execute_query( - f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}'" + f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}' UNION " + f"SELECT matviewname as table_name FROM pg_matviews WHERE schemaname = '{self._database_config.schema_name}'" ) )[1] ] diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 9d0605b0f..184338f2f 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -42,14 +42,15 @@ def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: def datasource(self) -> TzktDatasource: return self._datasource - async def get_state(self) -> State: - """Get state of index containing current level and config hash""" + @property + def state(self) -> State: if self._state is None: - await self._initialize_index_state() - return cast(State, self._state) + raise RuntimeError('Index state is not initialized') + return self._state async def process(self) -> None: - state = await self.get_state() + await self._initialize_index_state() + if self._config.last_block: last_level = self._config.last_block await self._synchronize(last_level, cache=True) @@ -57,7 +58,7 @@ async def process(self) -> None: self._logger.info('Datasource is not active, sync to the latest block') last_level = (await self._datasource.get_head_block()).level await self._synchronize(last_level) - elif self._datasource.sync_level > state.level: + elif self._datasource.sync_level > self.state.level: self._logger.info('Index is behind datasource, sync to datasource level') self._queue.clear() last_level = self._datasource.sync_level @@ -74,25 +75,25 @@ async def _process_queue(self) -> None: ... async def _enter_sync_state(self, last_level: int) -> Optional[int]: - state = await self.get_state() - first_level = state.level + first_level = self.state.level if first_level == last_level: return None if first_level > last_level: raise RuntimeError(f'Attempt to synchronize index from level {first_level} to level {last_level}') self._logger.info('Synchronizing index to level %s', last_level) - state.hash = None # type: ignore - await state.save() + self.state.hash = None # type: ignore + await self.state.save() return first_level async def _exit_sync_state(self, last_level: int) -> None: self._logger.info('Index is synchronized to level %s', last_level) - state = await self.get_state() - state.level = last_level # type: ignore - await state.save() + self.state.level = last_level # type: ignore + await self.state.save() async def _initialize_index_state(self) -> None: - self._logger.info('Getting index state') + if self._state: + return + self._logger.info('Initializing index state') index_config_hash = self._config.hash() state = await State.get_or_none( index_name=self._config.name, @@ -138,8 +139,21 @@ def push(self, level: int, operations: List[OperationData], block: Optional[Head self._queue.append((level, operations, block)) async def single_level_rollback(self, from_level: int) -> None: - """Ensure next arrived block is the same as rolled back one""" - self._rollback_level = from_level + """Ensure next arrived block is the same as rolled back one + + Called by IndexDispatcher in case index datasource reported a rollback. + """ + if self._rollback_level: + raise RuntimeError('Already in rollback state') + + await self._initialize_index_state() + if self.state.level < from_level: + self._logger.info('Index level is lower than rollback level, ignoring') + elif self.state.level == from_level: + self._logger.info('Single level rollback has been triggered') + self._rollback_level = from_level + else: + raise RuntimeError('Index level is higher than rollback level') async def _process_queue(self) -> None: if not self._queue: @@ -183,22 +197,26 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_operations(self, level: int, operations: List[OperationData], block: Optional[HeadBlockData] = None) -> None: - state = await self.get_state() - if level < state.level: - raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {state.level}') + if level < self.state.level: + raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}') if self._rollback_level: - if state.level != self._rollback_level: - raise RuntimeError(f'Rolling back to level {self._rollback_level}, state level {state.level}') - if level != self._rollback_level: - raise RuntimeError(f'Rolling back to level {self._rollback_level}, got operations of level {level}') + levels = { + 'operations': level, + 'rollback': self._rollback_level, + 'index': self.state.level, + 'block': block.level if block else level, + } + if len(set(levels.values())) != 1: + levels_repr = ', '.join(f'{k}={v}' for k, v in levels.items()) + raise RuntimeError(f'Index is in a rollback state, but received operation batch with different levels: {levels_repr}') self._logger.info('Rolling back to previous level, verifying processed operations') expected_hashes = set(self._last_hashes) received_hashes = set([op.hash for op in operations]) reused_hashes = received_hashes & expected_hashes if reused_hashes != expected_hashes: - self._logger.warning('Attempted a single level rollback but arrived block differs from processed one') + self._logger.warning('Attempted a single level rollback, but arrived block differs from processed one') await self._ctx.reindex() self._rollback_level = None @@ -212,10 +230,10 @@ async def _process_level_operations(self, level: int, operations: List[Operation self._logger.info('Processing %s operations of level %s', len(operations), level) await self._process_operations(operations) - state.level = level # type: ignore + self.state.level = level # type: ignore if block: - state.hash = block.hash # type: ignore - await state.save() + self.state.hash = block.hash # type: ignore + await self.state.save() async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT, operation: OperationData) -> bool: """Match single operation with pattern""" @@ -407,10 +425,10 @@ class BigMapIndex(Index): def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) - self._queue: Deque[Tuple[int, List[BigMapData]]] = deque() + self._queue: Deque[Tuple[int, List[BigMapData], Optional[HeadBlockData]]] = deque() - def push(self, level: int, big_maps: List[BigMapData]): - self._queue.append((level, big_maps)) + def push(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None): + self._queue.append((level, big_maps, block)) async def _process_queue(self): if not self._queue: @@ -418,8 +436,8 @@ async def _process_queue(self): self._logger.info('Processing websocket queue') with suppress(IndexError): while True: - level, big_maps = self._queue.popleft() - await self._process_level_big_maps(level, big_maps) + level, big_maps, block = self._queue.popleft() + await self._process_level_big_maps(level, big_maps, block) async def _synchronize(self, last_level: int, cache: bool = False) -> None: """Fetch operations via Fetcher and pass to message callback""" @@ -446,17 +464,18 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) - async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData]): - state = await self.get_state() - if state.level >= level: - raise RuntimeError(state.level, level) + async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None): + if level < self.state.level: + raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}') async with in_global_transaction(): self._logger.info('Processing %s big map diffs of level %s', len(big_maps), level) await self._process_big_maps(big_maps) - state.level = level # type: ignore - await state.save() + self.state.level = level # type: ignore + if block: + self.state.hash = block.hash # type: ignore + await self.state.save() async def _match_big_map(self, handler_config: BigMapHandlerConfig, big_map: BigMapData) -> bool: """Match single big map diff with pattern""" diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index aeee9ac25..3ed903159 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -34,6 +34,7 @@ def _get_operation(hash_: str, level: int) -> OperationData: # NOTE: Skip synchronization async def operation_index_process(self: OperationIndex): + await self._initialize_index_state() await self._process_queue()