diff --git a/pragma-sdk/pragma_sdk/offchain/client.py b/pragma-sdk/pragma_sdk/offchain/client.py index 1dd13bf2..2b197a5a 100644 --- a/pragma-sdk/pragma_sdk/offchain/client.py +++ b/pragma-sdk/pragma_sdk/offchain/client.py @@ -385,7 +385,7 @@ async def get_expiries_list(self, pair: Pair): raise HTTPError( f"Unable to GET /v1{base_asset}/{quote_asset}/future_expiries for pair {pair} " ) - return response + return json_response def get_endpoint_publish_offchain(data_type: DataTypes): diff --git a/price-pusher/price_pusher/configs/price_config.py b/price-pusher/price_pusher/configs/price_config.py index 0cba86f2..c09a4554 100644 --- a/price-pusher/price_pusher/configs/price_config.py +++ b/price-pusher/price_pusher/configs/price_config.py @@ -91,13 +91,10 @@ def get_all_assets(self) -> Dict[DataTypes, List[Pair]]: Dict from DataTypes to List of Pairs. """ pair_dict_by_type = {} - pair_dict_by_type[DataTypes.SPOT] = self.get_unique_spot_pairs() - if len(pair_dict_by_type[DataTypes.SPOT]) == 0: - del pair_dict_by_type[DataTypes.SPOT] - pair_dict_by_type[DataTypes.FUTURE] = self.get_unique_future_pairs() - if len(pair_dict_by_type[DataTypes.FUTURE]) == 0: - del pair_dict_by_type[DataTypes.FUTURE] - + if spot_pairs := self.get_unique_spot_pairs(): + pair_dict_by_type[DataTypes.SPOT] = spot_pairs + if future_pairs := self.get_unique_future_pairs(): + pair_dict_by_type[DataTypes.FUTURE] = future_pairs return pair_dict_by_type diff --git a/price-pusher/price_pusher/core/listener.py b/price-pusher/price_pusher/core/listener.py index 4586e691..1438c864 100644 --- a/price-pusher/price_pusher/core/listener.py +++ b/price-pusher/price_pusher/core/listener.py @@ -2,23 +2,21 @@ import logging from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Dict, Union from pragma_utils.readable_id import generate_human_readable_id, HumanReadableId -from pragma_sdk.common.types.entry import Entry +from pragma_sdk.common.types.entry import Entry, SpotEntry, FutureEntry from pragma_sdk.common.types.types import DataTypes from price_pusher.utils import exclude_none_and_exceptions, flatten_list from price_pusher.configs import PriceConfig from price_pusher.core.request_handlers.interface import IRequestHandler -from price_pusher.types import ( - DurationInSeconds, - LatestOrchestratorPairPrices, -) +from price_pusher.types import DurationInSeconds, LatestOrchestratorPairPrices, ExpiryTimestamp +CONSECUTIVES_EMPTY_ERRORS_LIMIT = 10 logger = logging.getLogger(__name__) @@ -37,6 +35,8 @@ class IPriceListener(ABC): notification_event: asyncio.Event polling_frequency_in_s: DurationInSeconds + consecutive_empty_oracle_error: int + @abstractmethod def set_orchestrator_prices( self, orchestrator_prices: LatestOrchestratorPairPrices @@ -49,7 +49,7 @@ async def run_forever(self) -> None: ... async def _fetch_all_oracle_prices(self) -> None: ... @abstractmethod - def _get_most_recent_orchestrator_entry( + def _get_latest_orchestrator_entry( self, pair_id: str, data_type: DataTypes ) -> Optional[Entry]: ... @@ -88,6 +88,8 @@ def __init__( self.notification_event = asyncio.Event() self.polling_frequency_in_s = polling_frequency_in_s + self.consecutive_empty_oracle_error = 0 + self._log_listener_spawning() async def run_forever(self) -> None: @@ -140,9 +142,14 @@ async def _fetch_all_oracle_prices(self) -> None: case DataTypes.SPOT: self.oracle_prices[pair_id][data_type] = entry case DataTypes.FUTURE: + if expiry is None: + logger.error( + f"Expiry none for future entry from Oracle: {pair_id}. Ignoring." + ) + continue self.oracle_prices[pair_id][data_type][expiry] = entry - def _get_most_recent_orchestrator_entry( + def _get_latest_orchestrator_entry( self, pair_id: str, data_type: DataTypes, expiry: str = None ) -> Optional[Entry]: """ @@ -183,38 +190,72 @@ async def _does_oracle_needs_update(self) -> bool: logging.error( f"LISTENER {self.id} have no oracle prices at all... Sending notification." ) + self.consecutive_empty_oracle_error += 1 + if self.consecutive_empty_oracle_error >= CONSECUTIVES_EMPTY_ERRORS_LIMIT: + raise ValueError( + "⛔ Oracle entries are still empty after " + f"{self.consecutive_empty_oracle_error} tries. " + "Publishing does not seem to work?" + ) return True for pair_id, oracle_entries in self.oracle_prices.items(): if pair_id not in self.orchestrator_prices: continue + for data_type, orchestrator_entries in self.orchestrator_prices[pair_id].items(): if data_type not in oracle_entries: logging.warn( - f"LISTENER {self.id} miss prices in oracle entries. Sending notification." + f"LISTENER {self.id} miss {data_type} prices in oracle entries. Sending notification." ) return True - oracle_entry = oracle_entries[data_type] - - # First check if the oracle entry is outdated - # TODO: incorrect :) - newest_entry = self._get_most_recent_orchestrator_entry(pair_id, data_type) - - if self._oracle_entry_is_outdated(pair_id, oracle_entry, newest_entry): - return True - - # If not, check its deviation + # For Spot entries, oracle_entry will be an Entry + # For future entries, it will be a Dict[str, Entry] for each expiry. + oracle_entry: Union[SpotEntry, Dict[ExpiryTimestamp, FutureEntry]] = oracle_entries[ + data_type + ] + + # Check if entries are not outdated... + match data_type: + case DataTypes.SPOT: + newest_spot_entry = self._get_latest_orchestrator_entry(pair_id, data_type) + if self._oracle_entry_is_outdated(pair_id, oracle_entry, newest_spot_entry): + return True + case DataTypes.FUTURE: + if self._future_entries_are_outdated(pair_id, data_type, oracle_entry): + return True + + # If they're not, we now check for the price deviation for entry in orchestrator_entries.values(): match data_type: case DataTypes.SPOT: oracle_entry_price = oracle_entry.price + if self._new_price_is_deviating( + pair_id, entry.price, oracle_entry_price + ): + return True case DataTypes.FUTURE: - expiry = entry.get_expiry() - oracle_entry_price = oracle_entry[expiry].price + for expiry, _entry in entry.items(): + oracle_entry_price = oracle_entry[expiry].price + if self._new_price_is_deviating( + pair_id, _entry.price, oracle_entry_price + ): + return True - if self._new_price_is_deviating(pair_id, entry.price, oracle_entry_price): - return True + return False + + def _future_entries_are_outdated( + self, pair_id: str, data_type: DataTypes, entries: Dict[ExpiryTimestamp, FutureEntry] + ) -> bool: + """ + Compare each timestamp future price together & returns True if one of them is + outdated. + """ + for expiry, oracle_entry in entries.items(): + orchestrator_entry = self._get_latest_orchestrator_entry(pair_id, data_type, expiry) + if self._oracle_entry_is_outdated(pair_id, oracle_entry, orchestrator_entry): + return True return False def _new_price_is_deviating(self, pair_id: str, new_price: int, oracle_price: int) -> bool: @@ -240,12 +281,7 @@ def _oracle_entry_is_outdated( the orchestrator and the most recent entry for the oracle. """ max_time_elapsed = self.price_config.time_difference - if newest_entry.get_asset_type() == DataTypes.SPOT: - oracle_entry_timestamp = oracle_entry.get_timestamp() - else: - latest_oracle_entry = max(oracle_entry.values(), key=lambda x: x.get_timestamp()) - oracle_entry_timestamp = latest_oracle_entry.get_timestamp() - + oracle_entry_timestamp = oracle_entry.get_timestamp() delta_t = newest_entry.get_timestamp() - oracle_entry_timestamp is_outdated = delta_t > max_time_elapsed diff --git a/price-pusher/price_pusher/core/pusher.py b/price-pusher/price_pusher/core/pusher.py index fd762724..f9e68887 100644 --- a/price-pusher/price_pusher/core/pusher.py +++ b/price-pusher/price_pusher/core/pusher.py @@ -27,7 +27,7 @@ async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]: try: response = await self.client.publish_entries(entries) # TODO: add execution config logger.info(f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entries!") - logger.debug(f"Response {response}") + logger.debug(f"Response from the API: {response}") return response except Exception as e: diff --git a/price-pusher/price_pusher/orchestrator.py b/price-pusher/price_pusher/orchestrator.py index 7ad6888d..36899b31 100644 --- a/price-pusher/price_pusher/orchestrator.py +++ b/price-pusher/price_pusher/orchestrator.py @@ -183,5 +183,3 @@ def callback_update_prices(self, entries: List[Entry]) -> None: if source not in self.latest_prices[pair_id][data_type]: self.latest_prices[pair_id][data_type][source] = {} self.latest_prices[pair_id][data_type][source][expiry] = entry - - print(self.latest_prices) diff --git a/price-pusher/price_pusher/types.py b/price-pusher/price_pusher/types.py index af7597b7..937418df 100644 --- a/price-pusher/price_pusher/types.py +++ b/price-pusher/price_pusher/types.py @@ -7,7 +7,8 @@ PairId = str SourceName = str -FuturePrices = Dict[str, Entry] +ExpiryTimestamp = str +FuturePrices = Dict[ExpiryTimestamp, Entry] LatestOrchestratorPairPrices = Dict[ PairId, Dict[DataTypes, Union[Dict[SourceName, Entry], Dict[SourceName, FuturePrices]]] ]