diff --git a/pdr_backend/models/contract_data.py b/pdr_backend/models/contract_data.py new file mode 100644 index 000000000..cd06a5211 --- /dev/null +++ b/pdr_backend/models/contract_data.py @@ -0,0 +1,32 @@ +class ContractData: + def __init__( + self, + name: str, + address: str, + symbol: str, + seconds_per_epoch: int, + seconds_per_subscription: int, + trueval_submit_timeout: int, + owner: str, + pair: str, + timeframe: str, + source: str, + ): + self.name = name + self.address = address + self.symbol = symbol + self.seconds_per_epoch = seconds_per_epoch + self.seconds_per_subscription = seconds_per_subscription + self.trueval_submit_timeout = trueval_submit_timeout + self.owner = owner + self.pair = pair + self.timeframe = timeframe + self.source = source + + @property + def quote(self): + return self.pair.split("-")[1] + + @property + def base(self): + return self.pair.split("-")[0] diff --git a/pdr_backend/models/slot.py b/pdr_backend/models/slot.py new file mode 100644 index 000000000..6c60319f8 --- /dev/null +++ b/pdr_backend/models/slot.py @@ -0,0 +1,7 @@ +from pdr_backend.models.contract_data import ContractData + + +class Slot: + def __init__(self, slot: int, contract: ContractData): + self.slot = slot + self.contract = contract diff --git a/pdr_backend/models/test/test_contract_data.py b/pdr_backend/models/test/test_contract_data.py new file mode 100644 index 000000000..e29d763e2 --- /dev/null +++ b/pdr_backend/models/test/test_contract_data.py @@ -0,0 +1,29 @@ +from pdr_backend.models.contract_data import ContractData + + +def test_contract_data_initialization(): + contract = ContractData( + "Contract Name", + "0x12345", + "test", + 300, + 60, + 15, + "0xowner", + "BTC-ETH", + "1h", + "binance", + ) + + assert contract.name == "Contract Name" + assert contract.address == "0x12345" + assert contract.symbol == "test" + assert contract.seconds_per_epoch == 300 + assert contract.seconds_per_subscription == 60 + assert contract.trueval_submit_timeout == 15 + assert contract.owner == "0xowner" + assert contract.pair == "BTC-ETH" + assert contract.timeframe == "1h" + assert contract.source == "binance" + assert contract.quote == "ETH" + assert contract.base == "BTC" diff --git a/pdr_backend/models/test/test_slot.py b/pdr_backend/models/test/test_slot.py new file mode 100644 index 000000000..e2ed26d25 --- /dev/null +++ b/pdr_backend/models/test/test_slot.py @@ -0,0 +1,24 @@ +from pdr_backend.models.slot import Slot +from pdr_backend.models.contract_data import ContractData + + +def test_slot_initialization(): + contract = ContractData( + "Contract Name", + "0x12345", + "test", + 300, + 60, + 15, + "0xowner", + "BTC/ETH", + "1h", + "binance", + ) + + slot_number = 5 + slot = Slot(slot_number, contract) + + assert slot.slot == slot_number + assert slot.contract == contract + assert isinstance(slot.contract, ContractData) diff --git a/pdr_backend/trueval/main.py b/pdr_backend/trueval/main.py index 5f946f892..b225185f0 100644 --- a/pdr_backend/trueval/main.py +++ b/pdr_backend/trueval/main.py @@ -1,111 +1,114 @@ -from os import getenv -from threading import Thread import time +from os import getenv from typing import Dict from pdr_backend.models.predictoor_contract import PredictoorContract from pdr_backend.util.env import getenv_or_exit -from pdr_backend.util.subgraph import query_predictContracts from pdr_backend.trueval.trueval import get_true_val from pdr_backend.util.web3_config import Web3Config +from pdr_backend.util.subgraph import get_pending_slots +from pdr_backend.models.slot import Slot -rpc_url = getenv_or_exit("RPC_URL") -subgraph_url = getenv_or_exit("SUBGRAPH_URL") -private_key = getenv_or_exit("PRIVATE_KEY") -pair_filters = getenv("PAIR_FILTER") -timeframe_filter = getenv("TIMEFRAME_FILTER") -source_filter = getenv("SOURCE_FILTER") -owner_addresses = getenv("OWNER_ADDRS") - -web3_config = Web3Config(rpc_url, private_key) -owner = web3_config.owner -""" Get all intresting topics that we can submit trueval """ -topics: Dict[str, dict] = {} +contract_cache: Dict[str, tuple] = {} -class NewTrueVal(Thread): - def __init__(self, topic, predictoor_contract, current_ts, epoch): - Thread.__init__(self) - - # set a default value - self.values = { - "last_submited_epoch": epoch, - "contract_address": predictoor_contract.contract_address, - } - self.topic = topic - self.epoch = epoch +class NewTrueVal: + def __init__( + self, + slot: Slot, + predictoor_contract: PredictoorContract, + seconds_per_epoch, + ): + self.slot = slot self.predictoor_contract = predictoor_contract - self.current_ts = current_ts + self.seconds_per_epoch = seconds_per_epoch - def run(self): + def run(self) -> dict: """ Get timestamp of previous epoch-2 , get the price Get timestamp of previous epoch-1, get the price Compare and submit trueval """ - seconds_per_epoch = self.predictoor_contract.get_secondsPerEpoch() - initial_ts = (self.epoch - 2) * seconds_per_epoch - - end_ts = (self.epoch - 1) * seconds_per_epoch + self.slot.slot = int(self.slot.slot) + initial_ts = self.slot.slot - self.seconds_per_epoch + end_ts = self.slot.slot - slot = (self.epoch - 1) * seconds_per_epoch + (true_val, error) = get_true_val(self.slot.contract, initial_ts, end_ts) + if error: + raise Exception( + f"Error getting trueval for {self.slot.contract.pair} and slot {self.slot.slot}" + ) - (true_val, cancel_round) = get_true_val(self.topic, initial_ts, end_ts) + # pylint: disable=line-too-long print( - f"Contract:{self.predictoor_contract.contract_address} - " - f"Submitting true_val {true_val} for slot:{slot}" + f"Contract:{self.predictoor_contract.contract_address} - Submitting true_val {true_val} and slot:{self.slot.slot}" + ) + + tx = self.predictoor_contract.submit_trueval( + true_val, self.slot.slot, False, True + ) + + return tx + + +def process_slot(slot: Slot, web3_config: Web3Config) -> dict: + contract_address = slot.contract.address + if contract_address in contract_cache: + predictoor_contract, seconds_per_epoch = contract_cache[contract_address] + else: + predictoor_contract = PredictoorContract(web3_config, contract_address) + seconds_per_epoch = predictoor_contract.get_secondsPerEpoch() + contract_cache[contract_address] = ( + predictoor_contract, + seconds_per_epoch, ) - try: - self.predictoor_contract.submit_trueval(true_val, slot, cancel_round) - except Exception as e: - print(e) + trueval = NewTrueVal(slot, predictoor_contract, seconds_per_epoch) + return trueval.run() + + +def main(testing=False): + rpc_url = getenv_or_exit("RPC_URL") + subgraph_url = getenv_or_exit("SUBGRAPH_URL") + private_key = getenv_or_exit("PRIVATE_KEY") + pair_filter = getenv("PAIR_FILTER") + timeframe_filter = getenv("TIMEFRAME_FILTER") + source_filter = getenv("SOURCE_FILTER") + owner_addresses = getenv("OWNER_ADDRS") + sleep_time = int(getenv("SLEEP_TIME", "30")) + batch_size = int(getenv("BATCH_SIZE", "50")) + web3_config = Web3Config(rpc_url, private_key) -def process_block(block): - """Process each contract and see if we need to submit""" - global topics - if not topics: - topics = query_predictContracts( + while True: + timestamp = web3_config.w3.eth.get_block("latest")["timestamp"] + pending_slots = get_pending_slots( subgraph_url, - pair_filters, + timestamp, + owner_addresses, + pair_filter, timeframe_filter, source_filter, - owner_addresses, ) - print(f"Got new block: {block['number']} with {len(topics)} topics") - for address in topics: - topic = topics[address] - predictoor_contract = PredictoorContract(web3_config, address) - epoch = predictoor_contract.get_current_epoch() - seconds_per_epoch = predictoor_contract.get_secondsPerEpoch() - seconds_till_epoch_end = ( - epoch * seconds_per_epoch + seconds_per_epoch - block["timestamp"] - ) - print( - f"\t{topic['name']} (at address {topic['address']} " - f"is at epoch {epoch}, seconds_per_epoch: {seconds_per_epoch}" - f", seconds_till_epoch_end: {seconds_till_epoch_end}" - ) - if epoch > topic["last_submited_epoch"] and epoch > 1: - # Let's make a prediction & claim rewards - thr = NewTrueVal(topic, predictoor_contract, block["timestamp"], epoch) - thr.run() - address = thr.values["contract_address"].lower() - new_epoch = thr.values["last_submited_epoch"] - topics[address]["last_submited_epoch"] = new_epoch - - -def main(): - print("Starting main loop...") - lastblock = 0 - while True: - block = web3_config.w3.eth.block_number - if block > lastblock: - lastblock = block - process_block(web3_config.w3.eth.get_block(block, full_transactions=False)) - else: - time.sleep(1) + print(f"Found {len(pending_slots)} pending slots, processing {batch_size}") + pending_slots = pending_slots[:batch_size] + + if len(pending_slots) == 0: + print(f"No pending slots, sleeping for {sleep_time} seconds...") + time.sleep(sleep_time) + continue + + for slot in pending_slots: + print("-" * 30) + print(f"Processing slot {slot.slot} for contract {slot.contract.address}") + try: + process_slot(slot, web3_config) + except Exception as e: + print("An error occured", e) + if testing: + break + print(f"Done processing, sleeping for {sleep_time} seconds...") + time.sleep(sleep_time) if __name__ == "__main__": diff --git a/pdr_backend/trueval/test/test_trueval.py b/pdr_backend/trueval/test/test_trueval.py index fcfe9b3c3..bd7f82277 100644 --- a/pdr_backend/trueval/test/test_trueval.py +++ b/pdr_backend/trueval/test/test_trueval.py @@ -1,5 +1,91 @@ +from unittest.mock import patch from pdr_backend.trueval.trueval import get_true_val +from pdr_backend.models.contract_data import ContractData -def test_get_true_val(): - pass +def mock_fetch_ohlcv(*args, **kwargs): + since = kwargs.get("since") + if since == 1: + return [[None, 100]] + elif since == 2: + return [[None, 200]] + else: + raise ValueError("Invalid timestamp") + + +def mock_fetch_ohlcv_fail(*args, **kwargs): + return [] + + +def test_get_trueval_success(): + contract = ContractData( + name="ETH-USDT", + address="0x1", + symbol="ETH-USDT", + seconds_per_epoch=60, + seconds_per_subscription=500, + pair="eth-usdt", + source="kraken", + timeframe="5m", + trueval_submit_timeout=100, + owner="0xowner", + ) + + with patch("ccxt.kraken.fetch_ohlcv", mock_fetch_ohlcv): + result = get_true_val(contract, 1, 2) + assert result == (True, False) # 1st True because 200 > 100 + + +def test_get_trueval_live_lowercase_slash(): + contract = ContractData( + name="ETH-USDT", + address="0x1", + symbol="ETH-USDT", + seconds_per_epoch=60, + seconds_per_subscription=500, + pair="btc/usdt", + source="kraken", + timeframe="5m", + trueval_submit_timeout=100, + owner="0xowner", + ) + + result = get_true_val(contract, 1692943200, 1692943500) + assert result == (True, False) + + +def test_get_trueval_live_lowercase_dash(): + contract = ContractData( + name="ETH-USDT", + address="0x1", + symbol="ETH-USDT", + seconds_per_epoch=60, + seconds_per_subscription=500, + pair="btc-usdt", + source="kraken", + timeframe="5m", + trueval_submit_timeout=100, + owner="0xowner", + ) + + result = get_true_val(contract, 1692943200, 1692943500) + assert result == (True, False) + + +def test_get_trueval_fail(): + contract = ContractData( + name="ETH-USDT", + address="0x1", + symbol="ETH-USDT", + seconds_per_epoch=60, + seconds_per_subscription=500, + pair="eth-usdt", + source="kraken", + timeframe="5m", + trueval_submit_timeout=100, + owner="0xowner", + ) + + with patch("ccxt.kraken.fetch_ohlcv", mock_fetch_ohlcv_fail): + result = get_true_val(contract, 1, 2) + assert result == (False, True) # 2nd True because failed diff --git a/pdr_backend/trueval/test/test_trueval_main.py b/pdr_backend/trueval/test/test_trueval_main.py index f7f64538f..fecb6955f 100644 --- a/pdr_backend/trueval/test/test_trueval_main.py +++ b/pdr_backend/trueval/test/test_trueval_main.py @@ -1,5 +1,124 @@ -from pdr_backend.trueval.main import NewTrueVal, process_block, main +import pytest +from unittest.mock import patch, Mock, MagicMock +from pdr_backend.models.contract_data import ContractData +from pdr_backend.models.slot import Slot +from pdr_backend.trueval.main import NewTrueVal, process_slot, main, contract_cache +from pdr_backend.util.web3_config import Web3Config -def test_trueval_main(): - pass +def test_new_trueval(slot): + mock_contract_class = mock_contract() + trueval = NewTrueVal(slot, mock_contract_class, 60) + result = trueval.run() + assert result == {"tx": "0x123"} + mock_contract_class.submit_trueval.assert_called_once_with( + True, 1692943200, False, True + ) + + +def test_trueval_main(slot): + with patch( + "pdr_backend.trueval.main.PredictoorContract", return_value=mock_contract() + ) as mock_predictoor_contract: + result = process_slot(slot, None) + assert result == {"tx": "0x123"} + mock_predictoor_contract.assert_called_once_with(None, "0x1") + mock_predictoor_contract.return_value.submit_trueval.assert_called_once_with( + True, 1692943200, False, True + ) + + +def test_trueval_main_cached(slot): + with patch( + "pdr_backend.trueval.main.PredictoorContract", return_value=mock_contract() + ) as mock_predictoor_contract: + process_slot(slot, None) + process_slot(slot, None) + assert mock_predictoor_contract.call_count == 1 + + +def test_trueval_with_mocked_price(slot): + with patch( + "pdr_backend.trueval.main.PredictoorContract", return_value=mock_contract() + ) as mock_predictoor_contract: + with patch("ccxt.kraken.fetch_ohlcv", mock_fetch_ohlcv): + result = process_slot(slot, None) + assert result == {"tx": "0x123"} + mock_predictoor_contract.return_value.submit_trueval.assert_called_once_with( + False, 1692943200, False, True + ) + + +def test_main(slot): + mocked_env = { + "RPC_URL": "http://localhost:8545", + "SUBGRAPH_URL": "http://localhost:9000", + "PRIVATE_KEY": "0x2b93e10997249bbb0f8daab932f7ed03163ffb2d4c8a2cab02992b92d2ade6ba", + "SLEEP_TIME": "1", + "BATCH_SIZE": "1", + } + + mocked_web3_config = MagicMock() + + with patch.dict("os.environ", mocked_env), patch("time.sleep"), patch( + "pdr_backend.trueval.main.get_pending_slots", return_value=[slot] + ), patch( + "pdr_backend.trueval.main.Web3Config", return_value=mocked_web3_config + ), patch( + "pdr_backend.trueval.main.process_slot" + ) as ps_mock: + main(True) + + ps_mock.assert_called_once_with(slot, mocked_web3_config) + + +# ------------------------------------------------------------ +### Fixtures + + +@pytest.fixture(scope="module") +def slot(): + contract_data = ContractData( + name="ETH-USDT", + address="0x1", + symbol="ETH-USDT", + seconds_per_epoch=60, + seconds_per_subscription=500, + pair="eth-usdt", + source="kraken", + timeframe="5m", + trueval_submit_timeout=100, + owner="0xowner", + ) + + return Slot( + contract=contract_data, + slot=1692943200, + ) + + +@pytest.fixture(autouse=True) +def clear_cache(): + contract_cache.clear() + + +# ------------------------------------------------------------ +### Mocks + + +def mock_contract(*args, **kwarg): + m = Mock() + m.get_secondsPerEpoch.return_value = 60 + m.submit_trueval.return_value = {"tx": "0x123"} + m.contract_address = "0x1" + return m + + +def mock_fetch_ohlcv(*args, **kwargs): + since = kwargs.get("since") + if since == 1692943140: + return [[None, 200]] + elif since == 1692943200: + return [[None, 100]] + else: + raise ValueError("Invalid timestamp") diff --git a/pdr_backend/trueval/trueval.py b/pdr_backend/trueval/trueval.py index f31739abb..99e0c49a8 100644 --- a/pdr_backend/trueval/trueval.py +++ b/pdr_backend/trueval/trueval.py @@ -19,9 +19,10 @@ """ import ccxt +from pdr_backend.models.contract_data import ContractData -def get_true_val(topic, initial_timestamp, end_timestamp): +def get_true_val(topic: ContractData, initial_timestamp, end_timestamp): """Given a topic, Returns the true val between end_timestamp and initial_timestamp Topic object looks like: @@ -31,7 +32,6 @@ def get_true_val(topic, initial_timestamp, end_timestamp): "symbol":"ETH-USDT", "blocks_per_epoch":"60", "blocks_per_subscription":"86400", - "last_submited_epoch":0, "pair":"eth-usdt", "base":"eth", "quote":"usdt", @@ -40,15 +40,20 @@ def get_true_val(topic, initial_timestamp, end_timestamp): } """ + symbol = topic.pair + if topic.source == "binance" or topic.source == "kraken": + symbol = symbol.replace("-", "/") + symbol = symbol.upper() try: - exchange_class = getattr(ccxt, topic["source"]) + exchange_class = getattr(ccxt, topic.source) exchange_ccxt = exchange_class() price_initial = exchange_ccxt.fetch_ohlcv( - topic["pair"], "1m", since=initial_timestamp, limit=1 + symbol, "1m", since=initial_timestamp, limit=1 ) price_end = exchange_ccxt.fetch_ohlcv( - topic["pair"], "1m", since=end_timestamp, limit=1 + symbol, "1m", since=end_timestamp, limit=1 ) return (price_end[0][1] >= price_initial[0][1], False) - except Exception: - return (False, 0, True) + except Exception as e: + print(f"Error getting trueval for {symbol} {e}") + return (False, True) diff --git a/pdr_backend/util/subgraph.py b/pdr_backend/util/subgraph.py index 82dc5f7d2..5d501fa6a 100644 --- a/pdr_backend/util/subgraph.py +++ b/pdr_backend/util/subgraph.py @@ -54,12 +54,19 @@ } """ -from typing import Optional, Dict +import os +from typing import Optional, Dict, List + +import requests from enforce_typing import enforce_types import requests from web3 import Web3 +from pdr_backend.util.web3_config import Web3Config +from pdr_backend.models.contract_data import ContractData +from pdr_backend.models.slot import Slot + _N_ERRORS = {} # exception_str : num_occurrences _N_THR = 3 @@ -254,3 +261,107 @@ def query_predictContracts( # pylint: disable=too-many-statements return {} return contracts + + +def get_pending_slots( + subgraph_url: str, + timestamp: int, + owner_addresses: Optional[List[str]], + pair_filter: Optional[str] = None, + timeframe_filter: Optional[str] = None, + source_filter: Optional[str] = None, +): + chunk_size = 1000 + offset = 0 + owners: Optional[List[str]] = owner_addresses + + slots: List[Slot] = [] + + while True: + query = """ + { + predictSlots(where: {slot_lte: %s}, skip:%s, first:%s, where: { truevalSubmitted: false }){ + id + slot + trueValues { + id + } + predictContract { + id + token { + id + name + symbol + nft { + owner { + id + } + nftData { + key + value + } + } + } + secondsPerEpoch + secondsPerSubscription + truevalSubmitTimeout + } + } + } + """ % ( + timestamp, + offset, + chunk_size, + ) + + offset += chunk_size + try: + result = query_subgraph(subgraph_url, query) + if not "data" in result: + print("No data in result") + break + slot_list = result["data"]["predictSlots"] + if slot_list == []: + break + for slot in slot_list: + timestamp = slot["slot"] + if slot["trueValues"] != []: + continue + + contract = slot["predictContract"] + info725 = contract["token"]["nft"]["nftData"] + info = info_from_725(info725) + + owner_id = contract["token"]["nft"]["owner"]["id"] + if owners and (owner_id not in owners): + continue + + if pair_filter and (info["pair"] not in pair_filter): + continue + + if timeframe_filter and (info["timeframe"] not in timeframe_filter): + continue + + if source_filter and (info["source"] not in source_filter): + continue + + contract_object = ContractData( + name=contract["token"]["name"], + address=contract["id"], + symbol=contract["token"]["symbol"], + seconds_per_epoch=contract["secondsPerEpoch"], + seconds_per_subscription=contract["secondsPerSubscription"], + trueval_submit_timeout=contract["truevalSubmitTimeout"], + owner=contract["token"]["nft"]["owner"]["id"], + pair=info["pair"], + timeframe=info["timeframe"], + source=info["source"], + ) + + slots.append(Slot(int(slot["slot"]), contract_object)) + + except Exception as e: + print(e) + break + + return slots diff --git a/pdr_backend/util/test/test_subgraph.py b/pdr_backend/util/test/test_subgraph.py index ac25f1fe7..cd6237861 100644 --- a/pdr_backend/util/test/test_subgraph.py +++ b/pdr_backend/util/test/test_subgraph.py @@ -11,6 +11,7 @@ info_from_725, query_subgraph, query_predictContracts, + get_pending_slots, ) @@ -194,3 +195,62 @@ def test_filter(monkeypatch, expect_result, pairs, timeframes, sources, owners): contracts = query_predictContracts("foo", pairs, timeframes, sources, owners) assert bool(contracts) == bool(expect_result) + + +@enforce_types +def test_get_pending_slots(monkeypatch): + sample_slot_data = [ + { + "id": "slot1", + "slot": 1000, + "trueValues": [], + "predictContract": { + "id": "contract1", + "token": { + "id": "token1", + "name": "ether", + "symbol": "ETH", + "nft": { + "owner": {"id": "owner1"}, + "nftData": [ + { + "key": key_to_725("pair"), + "value": value_to_725("ETH/USDT"), + }, + { + "key": key_to_725("timeframe"), + "value": value_to_725("5m"), + }, + ], + }, + }, + "secondsPerEpoch": 7, + "secondsPerSubscription": 700, + "truevalSubmitTimeout": 5, + }, + } + ] + + call_count = 0 + + def mock_query_subgraph(subgraph_url, query): + nonlocal call_count + call_count += 1 + if call_count > 2: + return {"data": {"predictSlots": []}} + return {"data": {"predictSlots": sample_slot_data}} + + monkeypatch.setattr("pdr_backend.util.subgraph.query_subgraph", mock_query_subgraph) + + result = get_pending_slots( + subgraph_url="foo", + timestamp=2000, + owner_addresses=None, + pair_filter=None, + timeframe_filter=None, + source_filter=None, + ) + + assert len(result) == 2 + assert result[0].slot == 1000 + assert result[0].contract.name == "ether"