Skip to content

Commit

Permalink
Fix matching in OperationCache (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jun 1, 2021
1 parent f420a93 commit 37a7b99
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
16 changes: 10 additions & 6 deletions src/dipdup/datasources/tzkt/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
OperationHandlerTransactionPatternConfig,
OperationIndexConfig,
)
from dipdup.exceptions import ConfigurationError
from dipdup.models import BigMapData, OperationData

OperationGroup = namedtuple('OperationGroup', ('hash', 'counter'))
Expand All @@ -24,15 +25,18 @@ def __init__(self) -> None:
super().__init__()
self._logger = logging.getLogger(__name__)
self._level: Optional[int] = None
self._indexes: Dict[str, OperationIndexConfig] = {}
self._indexes: List[OperationIndexConfig] = []
self._addresses: List[str] = []
self._operations: Dict[OperationGroup, List[OperationData]] = {}

async def add_index(self, index_config: OperationIndexConfig) -> None:
self._logger.debug('Adding index %s to cache', index_config)
for contract in index_config.contract_configs:
if contract.address in self._indexes:
raise RuntimeError(f'Address `{contract.address}` used in multiple indexes')
self._indexes[contract.address] = index_config
if contract.address in self._addresses:
raise ConfigurationError(f'Address `{contract.address}` used in multiple indexes')
self._addresses.append(contract.address)

self._logger.debug('Adding index %s to cache', index_config)
self._indexes.append(index_config)

async def add(self, operation: OperationData):
self._logger.debug('Adding operation %s to cache (%s, %s)', operation.id, operation.hash, operation.counter)
Expand Down Expand Up @@ -91,7 +95,7 @@ async def on_match(
self._logger.debug('Matching %s', key)
matched = False

for index_config in self._indexes.values():
for index_config in self._indexes:
for handler_config in index_config.handlers:
operation_idx = 0
pattern_idx = 0
Expand Down
10 changes: 8 additions & 2 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ async def run(self):
return


def dedup_operations(operations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return sorted(
list(({op['id']: op for op in operations}).values()),
key=lambda op: op['id'],
)


class OperationFetcher:
def __init__(
self,
Expand Down Expand Up @@ -250,8 +257,7 @@ async def fetch_operations_by_level(self):
while self._head <= head:
if self._head in self._operations:
operations = self._operations.pop(self._head)
operations = sorted(list(({op['id']: op for op in operations}).values()), key=lambda op: op['id'])
yield self._head, operations
yield self._head, dedup_operations(operations)
self._head += 1

if all(list(self._fetched.values())):
Expand Down
19 changes: 18 additions & 1 deletion tests/test_dipdup/test_datasources/test_tzkt/test_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
OperationIndexConfig,
OperationType,
)
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.datasources.tzkt.datasource import TzktDatasource, dedup_operations
from dipdup.models import IndexType, OperationData, OperationHandlerContext, State, TransactionContext
from dipdup.utils import tortoise_wrapper

Expand Down Expand Up @@ -195,3 +195,20 @@ async def test_on_operation_match_with_storage(self):
callback_mock.await_args[0][1].storage.proposals['e710c1a066bbbf73692168e783607996785260cec4d60930579827298493b8b9'],
Proposals,
)

async def test_dedup_operations(self) -> None:
operations = [
{'id': 5},
{'id': 3},
{'id': 3},
{'id': 1},
]
operations = dedup_operations(operations)
self.assertEqual(
[
{'id': 1},
{'id': 3},
{'id': 5},
],
operations,
)

0 comments on commit 37a7b99

Please sign in to comment.