Skip to content

Commit

Permalink
auto skip history for balances index
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Bobrikov committed Oct 17, 2023
1 parent e927b28 commit a5557d4
Show file tree
Hide file tree
Showing 17 changed files with 39 additions and 20 deletions.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
8 changes: 4 additions & 4 deletions src/dipdup/datasources/tezos_tzkt.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,8 @@ async def get_token_balances(
self,
token_addresses: set[str],
token_ids: set[int],
first_level: int,
last_level: int,
first_level: int | None = None,
last_level: int | None = None,
offset: int | None = None,
limit: int | None = None,
) -> tuple[TzktTokenBalanceData, ...]:
Expand All @@ -941,8 +941,8 @@ async def iter_token_balances(
self,
token_addresses: set[str],
token_ids: set[int],
first_level: int,
last_level: int,
first_level: int | None = None,
last_level: int | None = None,
) -> AsyncIterator[tuple[TzktTokenBalanceData, ...]]:
async for batch in self._iter_batches(
self.get_token_balances,
Expand Down
38 changes: 22 additions & 16 deletions src/dipdup/indexes/tezos_tzkt_token_balances/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,30 @@ def _create_fetcher(self, first_level: int, last_level: int) -> TokenBalanceFetc
)

async def _synchronize(self, sync_level: int) -> None:
"""Fetch token balances via Fetcher and pass to message callback"""
index_level = await self._enter_sync_state(sync_level)
if index_level is None:
return

first_level = index_level + 1
self._logger.info('Fetching token balances from level %s to %s', first_level, sync_level)
fetcher = self._create_fetcher(first_level, sync_level)

async for level, token_balances in fetcher.fetch_by_level():
with ExitStack() as stack:
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, sync_level - level)
stack.enter_context(Metrics.measure_level_sync_duration())
await self._process_level_token_balances(token_balances, sync_level)

await self._synchronize_actual(sync_level)
await self._exit_sync_state(sync_level)

async def _synchronize_actual(self, head_level: int) -> None:
"""Retrieve data for last level"""
# TODO: think about logging and metrics
addresses, token_ids = set(), set()
for handler in self._config.handlers:
if handler.contract and handler.contract.address is not None:
addresses.add(handler.contract.address)
if handler.token_id is not None:
token_ids.add(handler.token_id)

async with self._ctx.transactions.in_transaction(head_level, head_level, self.name):
# NOTE: in case of desynchronization filter balances for head_level
async for balances_batch in self._datasource.iter_token_balances(
addresses, token_ids, last_level=head_level
):
matched_handlers = match_token_balances(self._config.handlers, balances_batch)
for handler_config, matched_balance_data in matched_handlers:
await self._call_matched_handler(handler_config, matched_balance_data)

await self._update_state(level=head_level)

async def _process_level_token_balances(
self,
token_balances: tuple[TzktTokenBalanceData, ...],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from decimal import Decimal
from {{ project.package }} import models as models
from dipdup.context import HandlerContext
from dipdup.models.tezos_tzkt import TzktTokenBalanceData


async def on_balance_update(
ctx: HandlerContext,
token_balance: TzktTokenBalanceData,
) -> None:
holder, _ = await models.Holder.get_or_create(address=token_balance.contract_address)
holder.balance = Decimal(token_balance.balance_value or 0) / (10**8)
await holder.save()

0 comments on commit a5557d4

Please sign in to comment.