From 8a62b38b0434fefb3221c5fd98cecb9f32c4c049 Mon Sep 17 00:00:00 2001 From: azurwastaken Date: Thu, 4 Jul 2024 19:01:23 +0800 Subject: [PATCH] Next pp add future endpoint (#133) * its working boyz * remove log * fix lint --- .../price_pusher/core/request_handlers/api.py | 17 ++++++++++++----- price-pusher/price_pusher/orchestrator.py | 12 +++++++++--- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/price-pusher/price_pusher/core/request_handlers/api.py b/price-pusher/price_pusher/core/request_handlers/api.py index e0149a55..b20ebcf3 100644 --- a/price-pusher/price_pusher/core/request_handlers/api.py +++ b/price-pusher/price_pusher/core/request_handlers/api.py @@ -27,11 +27,18 @@ async def fetch_latest_entry(self, data_type: DataTypes, pair: Pair) -> Optional Fetch last entry for the asset from the API. TODO: Currently only works for spot assets. """ - entry_result: EntryResult = await self.client.get_entry( - pair=pair.__repr__(), - interval=Interval.ONE_MINUTE, - aggregation=AggregationMode.MEDIAN, - ) + if data_type is DataTypes.FUTURE: + entry_result: EntryResult = await self.client.get_future_entry( + pair=pair.__repr__(), + interval=Interval.ONE_MINUTE, + aggregation=AggregationMode.MEDIAN, + ) + else: + entry_result: EntryResult = await self.client.get_entry( + pair=pair.__repr__(), + interval=Interval.ONE_MINUTE, + aggregation=AggregationMode.MEDIAN, + ) entry = SpotEntry( pair_id=entry_result.pair_id, price=int(entry_result.data, 16), diff --git a/price-pusher/price_pusher/orchestrator.py b/price-pusher/price_pusher/orchestrator.py index ce45a538..1c40a0a0 100644 --- a/price-pusher/price_pusher/orchestrator.py +++ b/price-pusher/price_pusher/orchestrator.py @@ -128,10 +128,15 @@ def _flush_entries_for_assets(self, pairs_per_type: Dict[DataTypes, List[Pair]]) for data_type, pairs in pairs_per_type.items(): for pair in pairs: - if data_type not in self.latest_prices[f"{pair}"]: + pair_name = f"{pair}" + if ( + pair_name not in self.latest_prices + or data_type not in self.latest_prices[pair_name] + ): + logger.warning(f"ORCHESTRATOR : {pair_name} not found, continuing ...") continue - entries_to_push.extend(list(self.latest_prices[f"{pair}"][data_type].values())) - del self.latest_prices[f"{pair}"][data_type] + entries_to_push.extend(list(self.latest_prices[pair_name][data_type].values())) + del self.latest_prices[pair_name][data_type] return entries_to_push @@ -139,6 +144,7 @@ def callback_update_prices(self, entries: List[Entry]) -> None: """ Function called by the poller whenever new prices are retrieved. """ + for entry in entries: pair_id = entry.get_pair_id() source = entry.get_source()