Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index state management and Hasura bugfixes #117

Merged
merged 5 commits into from
Aug 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dipdup/datasources/datasource.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData


class BigMapsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]:
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData], block: HeadBlockData) -> Awaitable[None]:
...


98 changes: 34 additions & 64 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
@@ -320,7 +320,7 @@ def sync_level(self) -> Optional[int]:
@property
def block(self) -> HeadBlockData:
if self._block is None:
raise RuntimeError('No message from `head` channel received')
raise RuntimeError('Attempt to access head block before the first message')
return self._block

async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]:
@@ -633,87 +633,57 @@ def _default_http_config(self) -> HTTPConfig:
connection_limit=25,
)

async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw operations from WS"""
async def _extract_message_data(self, channel: str, message: List[Any]) -> Any:
for item in message:
current_level = item['state']
head_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got operation message, %s, level %s', message_type, current_level)
self._logger.debug('`%s` message: %s', channel, message_type.name)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level
if self._sync_level != head_level:
self._logger.info('Datasource level set to %s', head_level)
self._sync_level = head_level
self._level = head_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
operations = []
for operation_json in item['data']:
operation = self.convert_operation(operation_json)
if operation.status != 'applied':
continue
operations.append(operation)
if operations:
self.emit_operations(operations, self.block)
self._level = head_level
yield item['data']

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)
self.emit_rollback(self.level, head_level)

else:
raise NotImplementedError

async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw big map diffs from WS"""
for item in message:
current_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got big map message, %s, level %s', message_type, current_level)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
big_maps = []
for big_map_json in item['data']:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
self.emit_big_maps(big_maps)

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)
async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw operations from WS"""
async for data in self._extract_message_data('operation', message):
operations = []
for operation_json in data:
operation = self.convert_operation(operation_json)
if operation.status != 'applied':
continue
operations.append(operation)
if operations:
self.emit_operations(operations, self.block)

else:
raise NotImplementedError
async def _on_big_map_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw big map diffs from WS"""
async for data in self._extract_message_data('big_map', message):
big_maps = []
for big_map_json in data:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
self.emit_big_maps(big_maps)

async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
for item in message:
current_level = item['state']
message_type = TzktMessageType(item['type'])
self._logger.info('Got block message, %s, level %s', message_type, current_level)

if message_type == TzktMessageType.STATE:
self._sync_level = current_level
self._level = current_level

elif message_type == TzktMessageType.DATA:
self._level = current_level
block_json = item['data']
block = self.convert_head_block(block_json)
self._block = block
self.emit_head(block)

elif message_type == TzktMessageType.REORG:
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)

else:
raise NotImplementedError
async for data in self._extract_message_data('head', message):
block = self.convert_head_block(data)
self._block = block
self.emit_head(block)

@classmethod
def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:
22 changes: 13 additions & 9 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name
datasource = self._ctx.datasources[datasource_name]
if not isinstance(datasource, TzktDatasource):
raise RuntimeError
raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource')
operation_index = OperationIndex(self._ctx, index_config, datasource)
self._indexes[index_config.name] = operation_index
await datasource.add_index(index_config)
@@ -70,7 +70,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
datasource_name = cast(TzktDatasourceConfig, index_config.datasource).name
datasource = self._ctx.datasources[datasource_name]
if not isinstance(datasource, TzktDatasource):
raise RuntimeError
raise RuntimeError(f'`{datasource_name}` is not a TzktDatasource')
big_map_index = BigMapIndex(self._ctx, index_config, datasource)
self._indexes[index_config.name] = big_map_index
await datasource.add_index(index_config)
@@ -111,22 +111,23 @@ async def dispatch_operations(self, datasource: TzktDatasource, operations: List
if isinstance(index, OperationIndex) and index.datasource == datasource:
index.push(level, operations, block)

async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None:
async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData], block: HeadBlockData) -> None:
assert len(set(op.level for op in big_maps)) == 1
level = big_maps[0].level
for index in self._indexes.values():
if isinstance(index, BigMapIndex) and index.datasource == datasource:
index.push(level, big_maps)
index.push(level, big_maps, block)

async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
logger = FormattedLogger(ROLLBACK_HANDLER)
if from_level - to_level == 1:
# NOTE: Single level rollbacks are processed at Index level.
# NOTE: Notify all indexes with rolled back datasource to skip next level and just verify it
# NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block
for index in self._indexes.values():
if index.datasource == datasource:
# NOTE: Continue to rollback with handler
if not isinstance(index, OperationIndex):
self._logger.info('Single level rollback is not supported by `%s` indexes', index._config.kind)
break
await index.single_level_rollback(from_level)
else:
@@ -218,15 +219,18 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
async with AsyncExitStack() as stack:
worker_tasks = []
await stack.enter_async_context(tortoise_wrapper(url, models))

await self._initialize_database(reindex)
for datasource in self._datasources.values():
await stack.enter_async_context(datasource)

# NOTE: on_configure hook fires after database and datasources are initialized but before Hasura is
await self._on_configure()

if hasura_gateway:
await stack.enter_async_context(hasura_gateway)
worker_tasks.append(asyncio.create_task(hasura_gateway.configure()))

await self._initialize_database(reindex)
await self._configure()

self._logger.info('Starting datasources')
datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()]

@@ -255,7 +259,7 @@ async def migrate_to_v11(self) -> None:
await codegen.migrate_user_handlers_to_v11()
self._finish_migration('1.1')

async def _configure(self) -> None:
async def _on_configure(self) -> None:
"""Run user-defined initial configuration handler"""
configure_fn = self._config.get_configure_fn()
await configure_fn(self._ctx)
3 changes: 2 additions & 1 deletion src/dipdup/hasura.py
Original file line number Diff line number Diff line change
@@ -201,7 +201,8 @@ async def _get_views(self) -> List[str]:
row[0]
for row in (
await get_connection(None).execute_query(
f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}'"
f"SELECT table_name FROM information_schema.views WHERE table_schema = '{self._database_config.schema_name}' UNION "
f"SELECT matviewname as table_name FROM pg_matviews WHERE schemaname = '{self._database_config.schema_name}'"
)
)[1]
]
95 changes: 57 additions & 38 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
@@ -42,22 +42,23 @@ def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource:
def datasource(self) -> TzktDatasource:
return self._datasource

async def get_state(self) -> State:
"""Get state of index containing current level and config hash"""
@property
def state(self) -> State:
if self._state is None:
await self._initialize_index_state()
return cast(State, self._state)
raise RuntimeError('Index state is not initialized')
return self._state

async def process(self) -> None:
state = await self.get_state()
await self._initialize_index_state()

if self._config.last_block:
last_level = self._config.last_block
await self._synchronize(last_level, cache=True)
elif self._datasource.sync_level is None:
self._logger.info('Datasource is not active, sync to the latest block')
last_level = (await self._datasource.get_head_block()).level
await self._synchronize(last_level)
elif self._datasource.sync_level > state.level:
elif self._datasource.sync_level > self.state.level:
self._logger.info('Index is behind datasource, sync to datasource level')
self._queue.clear()
last_level = self._datasource.sync_level
@@ -74,25 +75,25 @@ async def _process_queue(self) -> None:
...

async def _enter_sync_state(self, last_level: int) -> Optional[int]:
state = await self.get_state()
first_level = state.level
first_level = self.state.level
if first_level == last_level:
return None
if first_level > last_level:
raise RuntimeError(f'Attempt to synchronize index from level {first_level} to level {last_level}')
self._logger.info('Synchronizing index to level %s', last_level)
state.hash = None # type: ignore
await state.save()
self.state.hash = None # type: ignore
await self.state.save()
return first_level

async def _exit_sync_state(self, last_level: int) -> None:
self._logger.info('Index is synchronized to level %s', last_level)
state = await self.get_state()
state.level = last_level # type: ignore
await state.save()
self.state.level = last_level # type: ignore
await self.state.save()

async def _initialize_index_state(self) -> None:
self._logger.info('Getting index state')
if self._state:
return
self._logger.info('Initializing index state')
index_config_hash = self._config.hash()
state = await State.get_or_none(
index_name=self._config.name,
@@ -138,8 +139,21 @@ def push(self, level: int, operations: List[OperationData], block: Optional[Head
self._queue.append((level, operations, block))

async def single_level_rollback(self, from_level: int) -> None:
"""Ensure next arrived block is the same as rolled back one"""
self._rollback_level = from_level
"""Ensure next arrived block is the same as rolled back one
Called by IndexDispatcher in case index datasource reported a rollback.
"""
if self._rollback_level:
raise RuntimeError('Already in rollback state')

await self._initialize_index_state()
if self.state.level < from_level:
self._logger.info('Index level is lower than rollback level, ignoring')
elif self.state.level == from_level:
self._logger.info('Single level rollback has been triggered')
self._rollback_level = from_level
else:
raise RuntimeError('Index level is higher than rollback level')

async def _process_queue(self) -> None:
if not self._queue:
@@ -183,22 +197,26 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
await self._exit_sync_state(last_level)

async def _process_level_operations(self, level: int, operations: List[OperationData], block: Optional[HeadBlockData] = None) -> None:
state = await self.get_state()
if level < state.level:
raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {state.level}')
if level < self.state.level:
raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}')

if self._rollback_level:
if state.level != self._rollback_level:
raise RuntimeError(f'Rolling back to level {self._rollback_level}, state level {state.level}')
if level != self._rollback_level:
raise RuntimeError(f'Rolling back to level {self._rollback_level}, got operations of level {level}')
levels = {
'operations': level,
'rollback': self._rollback_level,
'index': self.state.level,
'block': block.level if block else level,
}
if len(set(levels.values())) != 1:
levels_repr = ', '.join(f'{k}={v}' for k, v in levels.items())
raise RuntimeError(f'Index is in a rollback state, but received operation batch with different levels: {levels_repr}')

self._logger.info('Rolling back to previous level, verifying processed operations')
expected_hashes = set(self._last_hashes)
received_hashes = set([op.hash for op in operations])
reused_hashes = received_hashes & expected_hashes
if reused_hashes != expected_hashes:
self._logger.warning('Attempted a single level rollback but arrived block differs from processed one')
self._logger.warning('Attempted a single level rollback, but arrived block differs from processed one')
await self._ctx.reindex()

self._rollback_level = None
@@ -212,10 +230,10 @@ async def _process_level_operations(self, level: int, operations: List[Operation
self._logger.info('Processing %s operations of level %s', len(operations), level)
await self._process_operations(operations)

state.level = level # type: ignore
self.state.level = level # type: ignore
if block:
state.hash = block.hash # type: ignore
await state.save()
self.state.hash = block.hash # type: ignore
await self.state.save()

async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT, operation: OperationData) -> bool:
"""Match single operation with pattern"""
@@ -407,19 +425,19 @@ class BigMapIndex(Index):

def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
super().__init__(ctx, config, datasource)
self._queue: Deque[Tuple[int, List[BigMapData]]] = deque()
self._queue: Deque[Tuple[int, List[BigMapData], Optional[HeadBlockData]]] = deque()

def push(self, level: int, big_maps: List[BigMapData]):
self._queue.append((level, big_maps))
def push(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None):
self._queue.append((level, big_maps, block))

async def _process_queue(self):
if not self._queue:
return
self._logger.info('Processing websocket queue')
with suppress(IndexError):
while True:
level, big_maps = self._queue.popleft()
await self._process_level_big_maps(level, big_maps)
level, big_maps, block = self._queue.popleft()
await self._process_level_big_maps(level, big_maps, block)

async def _synchronize(self, last_level: int, cache: bool = False) -> None:
"""Fetch operations via Fetcher and pass to message callback"""
@@ -446,17 +464,18 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:

await self._exit_sync_state(last_level)

async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData]):
state = await self.get_state()
if state.level >= level:
raise RuntimeError(state.level, level)
async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None):
if level < self.state.level:
raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}')

async with in_global_transaction():
self._logger.info('Processing %s big map diffs of level %s', len(big_maps), level)
await self._process_big_maps(big_maps)

state.level = level # type: ignore
await state.save()
self.state.level = level # type: ignore
if block:
self.state.hash = block.hash # type: ignore
await self.state.save()

async def _match_big_map(self, handler_config: BigMapHandlerConfig, big_map: BigMapData) -> bool:
"""Match single big map diff with pattern"""
1 change: 1 addition & 0 deletions tests/integration_tests/test_rollback.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ def _get_operation(hash_: str, level: int) -> OperationData:

# NOTE: Skip synchronization
async def operation_index_process(self: OperationIndex):
await self._initialize_index_state()
await self._process_queue()