Skip to content

Commit

Permalink
update fetcher logic
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Dec 23, 2024
1 parent a3bc564 commit 1039ca3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/dipdup/datasources/substrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def save_file(self) -> None:

class SubstrateNodeDatasource(JsonRpcDatasource[SubstrateNodeDatasourceConfig]):
_default_http_config = HttpConfig(
batch_size=5,
batch_size=10,
)

def __init__(self, config: SubstrateNodeDatasourceConfig) -> None:
Expand Down
119 changes: 107 additions & 12 deletions src/dipdup/indexes/substrate_events/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,111 @@ async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubstrateEventD
async def fetch_events(self) -> AsyncIterator[tuple[SubstrateEventData, ...]]:
node = self.get_random_node()
batch_size = node._http_config.batch_size
queue_limit = 50

for batch_start in range(self._first_level, self._last_level, batch_size):
batch_end = min(batch_start + batch_size, self._last_level)

block_hashes = await asyncio.gather(
*(node.get_block_hash(level) for level in range(batch_start, batch_end)),
)
block_headers = await asyncio.gather(
*(node.get_block_header(hash_) for hash_ in block_hashes),
)
for block_hash, block_header in zip(block_hashes, block_headers, strict=True):
event_dicts = await self.get_random_node().get_events(block_hash)
yield tuple(SubstrateEventData.from_node(event_dict, block_header) for event_dict in event_dicts)
queues = { # type: ignore[var-annotated]
'levels': asyncio.Queue(),
'hashes': asyncio.Queue(),
'headers': asyncio.Queue(),
'events': asyncio.Queue(),
}

for level in range(self._first_level, self._last_level + 1):
await queues['levels'].put(level)

async def _hashes_loop(): # type: ignore[no-untyped-def]
async def _batch(levels): # type: ignore[no-untyped-def]
block_hashes = await asyncio.gather(
*(node.get_block_hash(level) for level in levels),
)
for block_hash in block_hashes:
await queues['hashes'].put(block_hash)
if queues['hashes'].qsize() >= queue_limit:
await asyncio.sleep(1)

batch = []
while queues['levels'].qsize() > 0:
batch.append(await queues['levels'].get())
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
batch = []

if batch:
await _batch(batch) # type: ignore[no-untyped-call]

async def _headers_loop(): # type: ignore[no-untyped-def]
async def _batch(hashes): # type: ignore[no-untyped-def]
block_headers = await asyncio.gather(
*(node.get_block_header(hash_) for hash_ in hashes),
)
for block_header in block_headers:
await queues['headers'].put(block_header)
if queues['headers'].qsize() >= queue_limit:
await asyncio.sleep(1)

batch = []
while True:
block_hash = await queues['hashes'].get()
batch.append(block_hash)
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
batch = []

if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0:
break

if batch:
await _batch(batch) # type: ignore[no-untyped-call]

async def _events_loop(): # type: ignore[no-untyped-def]
async def _batch(headers): # type: ignore[no-untyped-def]
block_events = await asyncio.gather(
*(node.get_events(header['hash']) for header in headers),
)
for header, events in zip(headers, block_events, strict=True):
await queues['events'].put((header, events))

batch = []
while True:
block_header = await queues['headers'].get()
batch.append(block_header)
if len(batch) >= batch_size:
await _batch(batch) # type: ignore[no-untyped-call]
batch = []

if queues['levels'].qsize() == 0 and queues['hashes'].qsize() == 0 and queues['headers'].qsize() == 0:
break

if batch:
await _batch(batch) # type: ignore[no-untyped-call]

async def _log_loop(): # type: ignore[no-untyped-def]
while True:
await asyncio.sleep(1)
self._logger.debug(
'queues: levels=%d hashes=%d headers=%d events=%d',
queues['levels'].qsize(),
queues['hashes'].qsize(),
queues['headers'].qsize(),
queues['events'].qsize(),
)

tasks = (
asyncio.create_task(_hashes_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_headers_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_events_loop()), # type: ignore[no-untyped-call]
asyncio.create_task(_log_loop()), # type: ignore[no-untyped-call]
)

while True:
if sum(queues[q].qsize() for q in queues) == 0 and all(t.done() for t in tasks):
break

for t in tasks:
if t.done():
await t

header, events = await queues['events'].get()
yield tuple(SubstrateEventData.from_node(event, header) for event in events)

tasks[-1].cancel()

0 comments on commit 1039ca3

Please sign in to comment.