Skip to content

Commit

Permalink
Index state management and Hasura bugfixes (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Aug 10, 2021
1 parent 9bd38cd commit 64fb454
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 113 deletions.
2 changes: 1 addition & 1 deletion src/dipdup/datasources/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData


class BigMapsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]:
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData], block: HeadBlockData) -> Awaitable[None]:
...


Expand Down
98 changes: 34 additions & 64 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def sync_level(self) -> Optional[int]:
@property
def block(self) -> HeadBlockData:
if self._block is None:
raise RuntimeError('No message from `head` channel received')
raise RuntimeError('Attempt to access head block before the first message')
return self._block

async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]:
Expand Down Expand Up @@ -633,87 +633,57 @@ def _default_http_config(self) -> HTTPConfig:
connection_limit=25,
)

async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw operations from WS"""
async def _extract_message_data(self, channel: str, message: List[Any]) -> Any:
for item in message:
current_level = item['state']
head_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got operation message, %s, level %s', message_type, current_level)
self._logger.debug('`%s` message: %s', channel, message_type.name)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level
if self._sync_level != head_level:
self._logger.info('Datasource level set to %s', head_level)
self._sync_level = head_level
self._level = head_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
operations = []
for operation_json in item['data']:
operation = self.convert_operation(operation_json)
if operation.status != 'applied':
continue
operations.append(operation)
if operations:
self.emit_operations(operations, self.block)
self._level = head_level
yield item['data']

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)
self.emit_rollback(self.level, head_level)

else:
raise NotImplementedError

async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw big map diffs from WS"""
for item in message:
current_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got big map message, %s, level %s', message_type, current_level)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
big_maps = []
for big_map_json in item['data']:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
self.emit_big_maps(big_maps)

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)
async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw operations from WS"""
async for data in self._extract_message_data('operation', message):
operations = []
for operation_json in data:
operation = self.convert_operation(operation_json)
if operation.status != 'applied':
continue
operations.append(operation)
if operations:
self.emit_operations(operations, self.block)

else:
raise NotImplementedError
async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw big map diffs from WS"""
async for data in self._extract_message_data('big_map', message):
big_maps = []
for big_map_json in data:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
self.emit_big_maps(big_maps)

async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
for item in message:
current_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got block message, %s, level %s', message_type, current_level)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
block_json = item['data']
block = self.convert_head_block(block_json)
self._block = block
self.emit_head(block)

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)

else:
raise NotImplementedError
async for data in self._extract_message_data('head', message):
block = self.convert_head_block(data)
self._block = block
self.emit_head(block)

@classmethod
def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:
Expand Down
22 changes: 13 additions & 9 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name
datasource = self._ctx.datasources[datasource_name]
if not isinstance(datasource, TzktDatasource):
raise RuntimeError
raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource')
operation_index = OperationIndex(self._ctx, index_config, datasource)
self._indexes[index_config.name] = operation_index
await datasource.add_index(index_config)
Expand All @@ -70,7 +70,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name
datasource = self._ctx.datasources[datasource_name]
if not isinstance(datasource, TzktDatasource):
raise RuntimeError
raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource')
big_map_index = BigMapIndex(self._ctx, index_config, datasource)
self._indexes[index_config.name] = big_map_index
await datasource.add_index(index_config)
Expand Down Expand Up @@ -111,22 +111,23 @@ async def dispatch_operations(self, datasource: TzktDatasource, operations: List
if isinstance(index, OperationIndex) and index.datasource == datasource:
index.push(level, operations, block)

async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None:
async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData], block: HeadBlockData) -> None:
assert len(set(op.level for op in big_maps)) == 1
level = big_maps[0].level
for index in self._indexes.values():
if isinstance(index, BigMapIndex) and index.datasource == datasource:
index.push(level, big_maps)
index.push(level, big_maps, block)

async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
logger = FormattedLogger(ROLLBACK_HANDLER)
if from_level - to_level == 1:
# NOTE: Single level rollbacks are processed at Index level.
# NOTE: Notify all indexes with rolled back datasource to skip next level and just verify it
# NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block
for index in self._indexes.values():
if index.datasource == datasource:
# NOTE: Continue to rollback with handler
if not isinstance(index, OperationIndex):
self._logger.info('Single level rollback is not supported by `%s` indexes', index._config.kind)
break
await index.single_level_rollback(from_level)
else:
Expand Down Expand Up @@ -218,15 +219,18 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
async with AsyncExitStack() as stack:
worker_tasks = []
await stack.enter_async_context(tortoise_wrapper(url, models))

await self._initialize_database(reindex)
for datasource in self._datasources.values():
await stack.enter_async_context(datasource)

# NOTE: on_configure hook fires after database and datasources are initialized but before Hasura is
await self._on_configure()

if hasura_gateway:
await stack.enter_async_context(hasura_gateway)
worker_tasks.append(asyncio.create_task(hasura_gateway.configure()))

await self._initialize_database(reindex)
await self._configure()

self._logger.info('Starting datasources')
datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()]

Expand Down Expand Up @@ -255,7 +259,7 @@ async def migrate_to_v11(self) -> None:
await codegen.migrate_user_handlers_to_v11()
self._finish_migration('1.1')

async def _configure(self) -> None:
async def _on_configure(self) -> None:
"""Run user-defined initial configuration handler"""
configure_fn = self._config.get_configure_fn()
await configure_fn(self._ctx)
Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/hasura.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ async def _get_views(self) -> List[str]:
row[0]
for row in (
await get_connection(None).execute_query(
f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}'"
f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}' UNION "
f"SELECT matviewname as table_name FROM pg_matviews WHERE schemaname = '{self._database_config.schema_name}'"
)
)[1]
]
Expand Down
Loading

0 comments on commit 64fb454

Please sign in to comment.