From 1039ca3856951945e3b07131d0199c827107aa38 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 23 Dec 2024 11:26:09 -0300 Subject: [PATCH] update fetcher logic --- src/dipdup/datasources/substrate_node.py | 2 +- .../indexes/substrate_events/fetcher.py | 119 ++++++++++++++++-- 2 files changed, 108 insertions(+), 13 deletions(-) diff --git a/src/dipdup/datasources/substrate_node.py b/src/dipdup/datasources/substrate_node.py index 18af85eb1..83183cc11 100644 --- a/src/dipdup/datasources/substrate_node.py +++ b/src/dipdup/datasources/substrate_node.py @@ -92,7 +92,7 @@ def save_file(self) -> None: class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]): _default_http_config = HttpConfig( - batch_size=5, + batch_size=10, ) def __init__(self, config: SubstrateNodeDatasourceConfig) -> None: diff --git a/src/dipdup/indexes/substrate_events/fetcher.py b/src/dipdup/indexes/substrate_events/fetcher.py index 9e840c249..83c70ddee 100644 --- a/src/dipdup/indexes/substrate_events/fetcher.py +++ b/src/dipdup/indexes/substrate_events/fetcher.py @@ -60,16 +60,111 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventD async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]: node = self.get_random_node() batch_size = node._http_config.batch_size + queue_limit = 50 - for batch_start in range(self._first_level, self._last_level, batch_size): - batch_end = min(batch_start + batch_size, self._last_level) - - block_hashes = await asyncio.gather( - *(node.get_block_hash(level) for level in range(batch_start, batch_end)), - ) - block_headers = await asyncio.gather( - *(node.get_block_header(hash_) for hash_ in block_hashes), - ) - for block_hash, block_header in zip(block_hashes, block_headers, strict=True): - event_dicts = await self.get_random_node().get_events(block_hash) - yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts) + queues = { # type: ignore[var-annotated] + 'levels': asyncio.Queue(), + 'hashes': asyncio.Queue(), + 'headers': asyncio.Queue(), + 'events': asyncio.Queue(), + } + + for level in range(self._first_level, self._last_level + 1): + await queues['levels'].put(level) + + async def _hashes_loop(): # type: ignore[no-untyped-def] + async def _batch(levels): # type: ignore[no-untyped-def] + block_hashes = await asyncio.gather( + *(node.get_block_hash(level) for level in levels), + ) + for block_hash in block_hashes: + await queues['hashes'].put(block_hash) + if queues['hashes'].qsize() >= queue_limit: + await asyncio.sleep(1) + + batch = [] + while queues['levels'].qsize() > 0: + batch.append(await queues['levels'].get()) + if len(batch) >= batch_size: + await _batch(batch) # type: ignore[no-untyped-call] + batch = [] + + if batch: + await _batch(batch) # type: ignore[no-untyped-call] + + async def _headers_loop(): # type: ignore[no-untyped-def] + async def _batch(hashes): # type: ignore[no-untyped-def] + block_headers = await asyncio.gather( + *(node.get_block_header(hash_) for hash_ in hashes), + ) + for block_header in block_headers: + await queues['headers'].put(block_header) + if queues['headers'].qsize() >= queue_limit: + await asyncio.sleep(1) + + batch = [] + while True: + block_hash = await queues['hashes'].get() + batch.append(block_hash) + if len(batch) >= batch_size: + await _batch(batch) # type: ignore[no-untyped-call] + batch = [] + + if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0: + break + + if batch: + await _batch(batch) # type: ignore[no-untyped-call] + + async def _events_loop(): # type: ignore[no-untyped-def] + async def _batch(headers): # type: ignore[no-untyped-def] + block_events = await asyncio.gather( + *(node.get_events(header['hash']) for header in headers), + ) + for header, events in zip(headers, block_events, strict=True): + await queues['events'].put((header, events)) + + batch = [] + while True: + block_header = await queues['headers'].get() + batch.append(block_header) + if len(batch) >= batch_size: + await _batch(batch) # type: ignore[no-untyped-call] + batch = [] + + if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0 and queues['headers'].qsize() == 0: + break + + if batch: + await _batch(batch) # type: ignore[no-untyped-call] + + async def _log_loop(): # type: ignore[no-untyped-def] + while True: + await asyncio.sleep(1) + self._logger.debug( + 'queues: levels=%d hashes=%d headers=%d events=%d', + queues['levels'].qsize(), + queues['hashes'].qsize(), + queues['headers'].qsize(), + queues['events'].qsize(), + ) + + tasks = ( + asyncio.create_task(_hashes_loop()), # type: ignore[no-untyped-call] + asyncio.create_task(_headers_loop()), # type: ignore[no-untyped-call] + asyncio.create_task(_events_loop()), # type: ignore[no-untyped-call] + asyncio.create_task(_log_loop()), # type: ignore[no-untyped-call] + ) + + while True: + if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks): + break + + for t in tasks: + if t.done(): + await t + + header, events = await queues['events'].get() + yield tuple(SubstrateEventData.from_node(event, header) for event in events) + + tasks[-1].cancel()