Skip to content

Commit

Permalink
fetcher optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Dec 23, 2024
1 parent d0fd532 commit a3bc564
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def save_file(self) -> None:

class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]):
_default_http_config = HttpConfig(
batch_size=20,
batch_size=5,
)

def __init__(self, config: SubstrateNodeDatasourceConfig) -> None:
Expand Down
21 changes: 16 additions & 5 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import AsyncIterator

from dipdup.datasources.substrate_node import SubstrateNodeDatasource
Expand Down Expand Up @@ -57,8 +58,18 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventD
yield level, events

async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
for level in range(self._first_level, self._last_level):
block_hash = await self.get_random_node().get_block_hash(level)
event_dicts = await self.get_random_node().get_events(block_hash)
block_header = await self.get_random_node().get_block_header(block_hash)
yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts)
node = self.get_random_node()
batch_size = node._http_config.batch_size

for batch_start in range(self._first_level, self._last_level, batch_size):
batch_end = min(batch_start + batch_size, self._last_level)

block_hashes = await asyncio.gather(
*(node.get_block_hash(level) for level in range(batch_start, batch_end)),
)
block_headers = await asyncio.gather(
*(node.get_block_header(hash_) for hash_ in block_hashes),
)
for block_hash, block_header in zip(block_hashes, block_headers, strict=True):
event_dicts = await self.get_random_node().get_events(block_hash)
yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts)

0 comments on commit a3bc564

Please sign in to comment.