Skip to content

Commit

Permalink
next:
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 7, 2024
1 parent 7d901bc commit 2112e59
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pragma-sdk/pragma_sdk/offchain/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ async def get_expiries_list(self, pair: Pair):
raise HTTPError(
f"Unable to GET /v1{base_asset}/{quote_asset}/future_expiries for pair {pair} "
)
return response
return json_response


def get_endpoint_publish_offchain(data_type: DataTypes):
Expand Down
11 changes: 4 additions & 7 deletions price-pusher/price_pusher/configs/price_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ def get_all_assets(self) -> Dict[DataTypes, List[Pair]]:
Dict from DataTypes to List of Pairs.
"""
pair_dict_by_type = {}
pair_dict_by_type[DataTypes.SPOT] = self.get_unique_spot_pairs()
if len(pair_dict_by_type[DataTypes.SPOT]) == 0:
del pair_dict_by_type[DataTypes.SPOT]
pair_dict_by_type[DataTypes.FUTURE] = self.get_unique_future_pairs()
if len(pair_dict_by_type[DataTypes.FUTURE]) == 0:
del pair_dict_by_type[DataTypes.FUTURE]

if spot_pairs := self.get_unique_spot_pairs():
pair_dict_by_type[DataTypes.SPOT] = spot_pairs
if future_pairs := self.get_unique_future_pairs():
pair_dict_by_type[DataTypes.FUTURE] = future_pairs
return pair_dict_by_type


Expand Down
94 changes: 65 additions & 29 deletions price-pusher/price_pusher/core/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@
import logging

from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Dict, Union

from pragma_utils.readable_id import generate_human_readable_id, HumanReadableId


from pragma_sdk.common.types.entry import Entry
from pragma_sdk.common.types.entry import Entry, SpotEntry, FutureEntry
from pragma_sdk.common.types.types import DataTypes

from price_pusher.utils import exclude_none_and_exceptions, flatten_list
from price_pusher.configs import PriceConfig
from price_pusher.core.request_handlers.interface import IRequestHandler
from price_pusher.types import (
DurationInSeconds,
LatestOrchestratorPairPrices,
)
from price_pusher.types import DurationInSeconds, LatestOrchestratorPairPrices, ExpiryTimestamp


CONSECUTIVES_EMPTY_ERRORS_LIMIT = 10
logger = logging.getLogger(__name__)


Expand All @@ -37,6 +35,8 @@ class IPriceListener(ABC):
notification_event: asyncio.Event
polling_frequency_in_s: DurationInSeconds

consecutive_empty_oracle_error: int

@abstractmethod
def set_orchestrator_prices(
self, orchestrator_prices: LatestOrchestratorPairPrices
Expand All @@ -49,7 +49,7 @@ async def run_forever(self) -> None: ...
async def _fetch_all_oracle_prices(self) -> None: ...

@abstractmethod
def _get_most_recent_orchestrator_entry(
def _get_latest_orchestrator_entry(
self, pair_id: str, data_type: DataTypes
) -> Optional[Entry]: ...

Expand Down Expand Up @@ -88,6 +88,8 @@ def __init__(
self.notification_event = asyncio.Event()
self.polling_frequency_in_s = polling_frequency_in_s

self.consecutive_empty_oracle_error = 0

self._log_listener_spawning()

async def run_forever(self) -> None:
Expand Down Expand Up @@ -140,9 +142,14 @@ async def _fetch_all_oracle_prices(self) -> None:
case DataTypes.SPOT:
self.oracle_prices[pair_id][data_type] = entry
case DataTypes.FUTURE:
if expiry is None:
logger.error(
f"Expiry none for future entry from Oracle: {pair_id}. Ignoring."
)
continue
self.oracle_prices[pair_id][data_type][expiry] = entry

def _get_most_recent_orchestrator_entry(
def _get_latest_orchestrator_entry(
self, pair_id: str, data_type: DataTypes, expiry: str = None
) -> Optional[Entry]:
"""
Expand Down Expand Up @@ -183,38 +190,72 @@ async def _does_oracle_needs_update(self) -> bool:
logging.error(
f"LISTENER {self.id} have no oracle prices at all... Sending notification."
)
self.consecutive_empty_oracle_error += 1
if self.consecutive_empty_oracle_error >= CONSECUTIVES_EMPTY_ERRORS_LIMIT:
raise ValueError(
"⛔ Oracle entries are still empty after "
f"{self.consecutive_empty_oracle_error} tries. "
"Publishing does not seem to work?"
)
return True

for pair_id, oracle_entries in self.oracle_prices.items():
if pair_id not in self.orchestrator_prices:
continue

for data_type, orchestrator_entries in self.orchestrator_prices[pair_id].items():
if data_type not in oracle_entries:
logging.warn(
f"LISTENER {self.id} miss prices in oracle entries. Sending notification."
f"LISTENER {self.id} miss {data_type} prices in oracle entries. Sending notification."
)
return True

oracle_entry = oracle_entries[data_type]

# First check if the oracle entry is outdated
# TODO: incorrect :)
newest_entry = self._get_most_recent_orchestrator_entry(pair_id, data_type)

if self._oracle_entry_is_outdated(pair_id, oracle_entry, newest_entry):
return True

# If not, check its deviation
# For Spot entries, oracle_entry will be an Entry
# For future entries, it will be a Dict[str, Entry] for each expiry.
oracle_entry: Union[SpotEntry, Dict[ExpiryTimestamp, FutureEntry]] = oracle_entries[
data_type
]

# Check if entries are not outdated...
match data_type:
case DataTypes.SPOT:
newest_spot_entry = self._get_latest_orchestrator_entry(pair_id, data_type)
if self._oracle_entry_is_outdated(pair_id, oracle_entry, newest_spot_entry):
return True
case DataTypes.FUTURE:
if self._future_entries_are_outdated(pair_id, data_type, oracle_entry):
return True

# If they're not, we now check for the price deviation
for entry in orchestrator_entries.values():
match data_type:
case DataTypes.SPOT:
oracle_entry_price = oracle_entry.price
if self._new_price_is_deviating(
pair_id, entry.price, oracle_entry_price
):
return True
case DataTypes.FUTURE:
expiry = entry.get_expiry()
oracle_entry_price = oracle_entry[expiry].price
for expiry, _entry in entry.items():
oracle_entry_price = oracle_entry[expiry].price
if self._new_price_is_deviating(
pair_id, _entry.price, oracle_entry_price
):
return True

if self._new_price_is_deviating(pair_id, entry.price, oracle_entry_price):
return True
return False

def _future_entries_are_outdated(
self, pair_id: str, data_type: DataTypes, entries: Dict[ExpiryTimestamp, FutureEntry]
) -> bool:
"""
Compare each timestamp future price together & returns True if one of them is
outdated.
"""
for expiry, oracle_entry in entries.items():
orchestrator_entry = self._get_latest_orchestrator_entry(pair_id, data_type, expiry)
if self._oracle_entry_is_outdated(pair_id, oracle_entry, orchestrator_entry):
return True
return False

def _new_price_is_deviating(self, pair_id: str, new_price: int, oracle_price: int) -> bool:
Expand All @@ -240,12 +281,7 @@ def _oracle_entry_is_outdated(
the orchestrator and the most recent entry for the oracle.
"""
max_time_elapsed = self.price_config.time_difference
if newest_entry.get_asset_type() == DataTypes.SPOT:
oracle_entry_timestamp = oracle_entry.get_timestamp()
else:
latest_oracle_entry = max(oracle_entry.values(), key=lambda x: x.get_timestamp())
oracle_entry_timestamp = latest_oracle_entry.get_timestamp()

oracle_entry_timestamp = oracle_entry.get_timestamp()
delta_t = newest_entry.get_timestamp() - oracle_entry_timestamp
is_outdated = delta_t > max_time_elapsed

Expand Down
2 changes: 1 addition & 1 deletion price-pusher/price_pusher/core/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]:
try:
response = await self.client.publish_entries(entries) # TODO: add execution config
logger.info(f"🏋️ PUSHER: ✅ Successfully published {len(entries)} entries!")
logger.debug(f"Response {response}")
logger.debug(f"Response from the API: {response}")

return response
except Exception as e:
Expand Down
2 changes: 0 additions & 2 deletions price-pusher/price_pusher/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,3 @@ def callback_update_prices(self, entries: List[Entry]) -> None:
if source not in self.latest_prices[pair_id][data_type]:
self.latest_prices[pair_id][data_type][source] = {}
self.latest_prices[pair_id][data_type][source][expiry] = entry

print(self.latest_prices)
3 changes: 2 additions & 1 deletion price-pusher/price_pusher/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

PairId = str
SourceName = str
FuturePrices = Dict[str, Entry]
ExpiryTimestamp = str
FuturePrices = Dict[ExpiryTimestamp, Entry]
LatestOrchestratorPairPrices = Dict[
PairId, Dict[DataTypes, Union[Dict[SourceName, Entry], Dict[SourceName, FuturePrices]]]
]
Expand Down

0 comments on commit 2112e59

Please sign in to comment.