From 3b3ae01eff1744d25d49ed23291753f5178863c6 Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Mon, 20 May 2024 19:23:11 +0800 Subject: [PATCH 1/5] feat: get logs batching --- fastlane_bot/config/constants.py | 13 +++++++ fastlane_bot/events/event_gatherer.py | 51 ++++++++++++++++++++++----- main.py | 7 +++- 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/fastlane_bot/config/constants.py b/fastlane_bot/config/constants.py index ad6641ac0..ad855d546 100644 --- a/fastlane_bot/config/constants.py +++ b/fastlane_bot/config/constants.py @@ -34,3 +34,16 @@ SECTA_V3_NAME = "secta_v3" METAVAULT_V3_NAME = "metavault_v3" ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" + +BLOCK_CHUNK_SIZE_MAP = { + "ethereum": 0, + "polygon": 0, + "polygon_zkevm": 0, + "arbitrum_one": 0, + "optimism": 0, + "coinbase_base": 0, + "fantom": 5000, + "mantle": 0, + "linea": 0, + "sei": 0, +} diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index 1659f5807..382343ee2 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -1,5 +1,5 @@ from itertools import chain -from typing import Dict +from typing import Dict, List import asyncio import nest_asyncio @@ -7,6 +7,7 @@ from web3 import AsyncWeb3 from web3.contract import Contract +from fastlane_bot.config.constants import BLOCK_CHUNK_SIZE_MAP from .interfaces.subscription import Subscription from .exchanges.base import Exchange @@ -21,6 +22,7 @@ class EventGatherer: def __init__( self, + blockchain: str, w3: AsyncWeb3, exchanges: Dict[str, Exchange], event_contracts: Dict[str, Contract], @@ -30,6 +32,7 @@ def __init__( manager: The Manager object w3: The connected AsyncWeb3 object. """ + self._blockchain = blockchain self._w3 = w3 self._subscriptions = [] unique_topics = set() @@ -48,16 +51,46 @@ def get_all_events(self, from_block: int, to_block: int): from_block_ = 0 else: from_block_ = from_block - coroutines.append(self._get_events_for_topic(from_block_, to_block, sub)) + coroutines.append(self._get_events_for_subscription(from_block_, to_block, sub)) results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*coroutines)) return list(chain.from_iterable(results)) - async def _get_events_for_topic(self, from_block: int, to_block: int, subscription: Subscription): - events = await self._w3.eth.get_logs(filter_params={ - "fromBlock": from_block, - "toBlock": to_block, - "topics": [subscription.topic] - }) - return [subscription.parse_log(event) for event in events] + async def _get_events_for_subscription(self, from_block: int, to_block: int, subscription: Subscription): + return [subscription.parse_log(log) for log in await self._get_logs_for_topics(from_block, to_block, [subscription.topic])] + async def _get_logs_for_topics(self, from_block: int, to_block: int, topics: List[str]): + chunk_size = BLOCK_CHUNK_SIZE_MAP[self._blockchain] + if chunk_size > 0: + return await self._get_logs_iterative(from_block, to_block, topics, chunk_size) + else: + return await self._get_logs_recursive(from_block, to_block, topics) + async def _get_logs_iterative(self, from_block: int, to_block: int, topics: List[str], chunk_size: int) -> list: + block_numbers = list(range(from_block, to_block + 1, chunk_size)) + [to_block + 1] + log_lists = await asyncio.gather([ + self._w3.eth.get_logs(filter_params={ + "fromBlock": r[0], + "toBlock": r[1], + "topics": topics + }) + for r in zip(block_numbers, map(lambda n: n - 1, block_numbers[1:])) + ]) + return [log for log_list in log_lists for log in log_list] + + async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List[str]) -> list: + if from_block <= to_block: + try: + return await self._w3.eth.get_logs(filter_params={ + "fromBlock": from_block, + "toBlock": to_block, + "topics": topics + }) + except Exception as e: + assert "eth_getLogs" in str(e), str(e) + mid_block = (from_block + to_block) // 2 + log_lists = await asyncio.gather( + self._get_logs_recursive(from_block, mid_block, topics), + self._get_logs_recursive(mid_block + 1, to_block, topics) + ) + return [log for log_list in log_lists for log in log_list] + return [] diff --git a/main.py b/main.py index fcc722efe..37a8d7490 100644 --- a/main.py +++ b/main.py @@ -306,7 +306,12 @@ def run(mgr, args, tenderly_uri=None) -> None: mainnet_uri = mgr.cfg.w3.provider.endpoint_uri handle_static_pools_update(mgr) - event_gatherer = EventGatherer(w3=mgr.w3_async, exchanges=mgr.exchanges, event_contracts=mgr.event_contracts) + event_gatherer = EventGatherer( + blockchain=args.blockchain, + w3=mgr.w3_async, + exchanges=mgr.exchanges, + event_contracts=mgr.event_contracts + ) pool_finder = PoolFinder( carbon_forks=mgr.cfg.network.CARBON_V1_FORKS, From 0d651521f91fa99ff30ec4fa172749dbc42a1b3c Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Tue, 21 May 2024 16:33:54 +0800 Subject: [PATCH 2/5] fix: infinite recursion when fetching logs --- fastlane_bot/events/event_gatherer.py | 13 +++++++------ run_blockchain_terraformer.py | 9 +++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index 382343ee2..e2016ffa0 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -87,10 +87,11 @@ async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List }) except Exception as e: assert "eth_getLogs" in str(e), str(e) - mid_block = (from_block + to_block) // 2 - log_lists = await asyncio.gather( - self._get_logs_recursive(from_block, mid_block, topics), - self._get_logs_recursive(mid_block + 1, to_block, topics) - ) - return [log for log_list in log_lists for log in log_list] + if from_block < to_block: + mid_block = (from_block + to_block) // 2 + log_lists = await asyncio.gather( + self._get_logs_recursive(from_block, mid_block, topics), + self._get_logs_recursive(mid_block + 1, to_block, topics) + ) + return [log for log_list in log_lists for log in log_list] return [] diff --git a/run_blockchain_terraformer.py b/run_blockchain_terraformer.py index 854119d40..350affee0 100644 --- a/run_blockchain_terraformer.py +++ b/run_blockchain_terraformer.py @@ -680,10 +680,11 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis return get_logs(fromBlock=start_block, toBlock=end_block) except Exception as e: assert "eth_getLogs" in str(e), str(e) - mid_block = (start_block + end_block) // 2 - event_list_1 = get_events_recursive(get_logs, start_block, mid_block) - event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) - return event_list_1 + event_list_2 + if start_block < end_block: + mid_block = (start_block + end_block) // 2 + event_list_1 = get_events_recursive(get_logs, start_block, mid_block) + event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) + return event_list_1 + event_list_2 return [] From 605c0d361b0a5b8eeff37ce6b0854108a620d2b0 Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Tue, 21 May 2024 17:53:36 +0800 Subject: [PATCH 3/5] fix: print exception when failed to collect logs from blocks --- fastlane_bot/events/event_gatherer.py | 5 ++++- run_blockchain_terraformer.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index e2016ffa0..9c7b67a33 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -1,7 +1,8 @@ +import asyncio +import traceback from itertools import chain from typing import Dict, List -import asyncio import nest_asyncio from web3 import AsyncWeb3 @@ -94,4 +95,6 @@ async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List self._get_logs_recursive(mid_block + 1, to_block, topics) ) return [log for log_list in log_lists for log in log_list] + else: + traceback.print_exc(e) return [] diff --git a/run_blockchain_terraformer.py b/run_blockchain_terraformer.py index 350affee0..259c93532 100644 --- a/run_blockchain_terraformer.py +++ b/run_blockchain_terraformer.py @@ -1,4 +1,5 @@ import math +import traceback from typing import Tuple, List, Dict import pandas as pd @@ -685,6 +686,8 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis event_list_1 = get_events_recursive(get_logs, start_block, mid_block) event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) return event_list_1 + event_list_2 + else: + traceback.print_exc(e) return [] From 965269c4ee61ab7ea2336e7826d24c388a624291 Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Tue, 21 May 2024 18:26:36 +0800 Subject: [PATCH 4/5] fix: error handling --- fastlane_bot/events/event_gatherer.py | 5 ++--- run_blockchain_terraformer.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index 9c7b67a33..30b7bf7f3 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -1,5 +1,4 @@ import asyncio -import traceback from itertools import chain from typing import Dict, List @@ -96,5 +95,5 @@ async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List ) return [log for log_list in log_lists for log in log_list] else: - traceback.print_exc(e) - return [] + raise e + raise Exception(f"Illegal log query range: {from_block} -> {to_block}") diff --git a/run_blockchain_terraformer.py b/run_blockchain_terraformer.py index 259c93532..df514dbee 100644 --- a/run_blockchain_terraformer.py +++ b/run_blockchain_terraformer.py @@ -1,5 +1,4 @@ import math -import traceback from typing import Tuple, List, Dict import pandas as pd @@ -687,8 +686,8 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) return event_list_1 + event_list_2 else: - traceback.print_exc(e) - return [] + raise e + raise Exception(f"Illegal log query range: {start_block} -> {end_block}") def get_uni_v3_pools( From 6769d81a92d0f9495b4386eed486fdba8064428e Mon Sep 17 00:00:00 2001 From: Platon Floria Date: Tue, 21 May 2024 20:02:23 +0800 Subject: [PATCH 5/5] fix: take network name from config --- main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.py b/main.py index 37a8d7490..722539278 100644 --- a/main.py +++ b/main.py @@ -307,7 +307,7 @@ def run(mgr, args, tenderly_uri=None) -> None: handle_static_pools_update(mgr) event_gatherer = EventGatherer( - blockchain=args.blockchain, + blockchain=mgr.cfg.network.NETWORK, w3=mgr.w3_async, exchanges=mgr.exchanges, event_contracts=mgr.event_contracts