From 2c29da9c20d23b94734624cb6e25d5770201a89a Mon Sep 17 00:00:00 2001 From: Richard Blythman Date: Wed, 16 Aug 2023 09:06:49 -0500 Subject: [PATCH 1/4] migrate real-time prediction loop from pdr-model-experiments update predictoor readme with instructions for models --- READMEs/predictoor.md | 16 ++ .../predictoor/examples/models/main.py | 230 ++++++++++++++++++ .../predictoor/examples/models/predict.py | 34 +++ pdr_backend/predictoor/predict.py | 2 +- pdr_backend/utils/contract.py | 4 +- setup.py | 1 + 6 files changed, 284 insertions(+), 3 deletions(-) create mode 100644 pdr_backend/predictoor/examples/models/main.py create mode 100644 pdr_backend/predictoor/examples/models/predict.py diff --git a/READMEs/predictoor.md b/READMEs/predictoor.md index 96f77ac24..fec53bfca 100644 --- a/READMEs/predictoor.md +++ b/READMEs/predictoor.md @@ -39,7 +39,12 @@ export ADDRESS_FILE="${HOME}/.ocean/ocean-contracts/artifacts/address.json" export RPC_URL=http://127.0.0.1:8545 export SUBGRAPH_URL="http://172.15.0.15:8000/subgraphs/name/oceanprotocol/ocean-subgraph" export PRIVATE_KEY="0xef4b441145c1d0f3b4bc6d61d29f5c6e502359481152f869247c7a4244d45209" +``` + +### Option A: Use Random Predictions +In the same console: +``` # run predictoor agent python3 pdr_backend/predictoor/main.py ``` @@ -50,6 +55,17 @@ You can query predictoor subgraph for detailed run info. See [subgraph.md](subgr The agent predicts according to the `predict()` function in [`pdr_backend/predictoor/predict.py`](../pdr_backend/predictoor/predict.py). Its default strategy is simplistic (random predictions). So you need to customize it. The docstring at the top of `predict.py` provides more info. +### Option B: Use Simple or Custom Model + +- Clone the simple model [repo](https://github.com/oceanprotocol/pdr-model-simple) or fork the repo and create a custom model +- From pdr-backend, make a symlink to the repo subdirectory for the simple model (or your own customized model) e.g. `ln -s /path/to/pdr-model-simple/pdr_model_simple/ pdr_backend/predictoor/examples/models` + +In the same console: +``` +# run predictoor agent +python3 pdr_backend/predictoor/examples/models/main.py +``` + ## Remote Testnet Usage FIXME diff --git a/pdr_backend/predictoor/examples/models/main.py b/pdr_backend/predictoor/examples/models/main.py new file mode 100644 index 000000000..8c572e3a7 --- /dev/null +++ b/pdr_backend/predictoor/examples/models/main.py @@ -0,0 +1,230 @@ +import ccxt +import csv +import numpy as np +import os +import pandas as pd +import time +import threading +from datetime import datetime, timedelta, timezone +from threading import Thread + +from pdr_backend.predictoor.examples.models.pdr_model_simple.model import OceanModel +from pdr_backend.predictoor.examples.models.predict import predict_function +from pdr_backend.utils.subgraph import get_all_interesting_prediction_contracts +from pdr_backend.utils.contract import PredictoorContract, Web3Config +from pdr_backend.utils import env + + +last_block_time = 0 +topics = [] + +rpc_url = env.get_rpc_url_or_exit() +subgraph_url = env.get_subgraph_or_exit() +private_key = env.get_private_key_or_exit() +pair_filters = env.get_pair_filter() +timeframe_filter = env.get_timeframe_filter() +source_filter = env.get_source_filter() +owner_addresses = env.get_owner_addresses() + +web3_config = Web3Config(rpc_url, private_key) +owner = web3_config.owner + +exchange_id = 'binance' +pair='BTC/TUSD' +timeframe='5m' +exchange_class = getattr(ccxt, exchange_id) +exchange_ccxt = exchange_class({ + 'timeout': 30000 +}) + +models = [ + OceanModel(exchange_id,pair,timeframe), +] + +def process_block(block, model, main_pd): + global topics + """ Process each contract and if needed, get a prediction, submit it and claim revenue for past epoch """ + if not topics: + topics = get_all_interesting_prediction_contracts( + subgraph_url, + pair_filters, + timeframe_filter, + source_filter, + owner_addresses, + ) + print(f"Got new block: {block['number']} with {len(topics)} topics") + for address in topics: + topic = topics[address] + predictoor_contract = PredictoorContract(web3_config, address) + epoch = predictoor_contract.get_current_epoch() + seconds_per_epoch = predictoor_contract.get_secondsPerEpoch() + seconds_till_epoch_end = ( + epoch * seconds_per_epoch + seconds_per_epoch - block["timestamp"] + ) + print( + f"\t{topic['name']} (at address {topic['address']} is at epoch {epoch}, seconds_per_epoch: {seconds_per_epoch}, seconds_till_epoch_end: {seconds_till_epoch_end}" + ) + + if epoch > topic["last_submited_epoch"] and topic["last_submited_epoch"] > 0: + # let's get the payout for previous epoch. We don't care if it fails... + slot = epoch * seconds_per_epoch - seconds_per_epoch + print( + f"Contract:{predictoor_contract.contract_address} - Claiming revenue for slot:{slot}" + ) + predictoor_contract.payout(slot, False) + + if seconds_till_epoch_end <= int( + os.getenv("SECONDS_TILL_EPOCH_END", 60) + ): + """Timestamp of prediction""" + target_time = (epoch + 2) * seconds_per_epoch + + """Let's fetch the prediction """ + (predicted_value, predicted_confidence) = predict_function( + topic, target_time, model, main_pd + ) + if predicted_value is not None and predicted_confidence > 0: + """We have a prediction, let's submit it""" + stake_amount = os.getenv("STAKE_AMOUNT", 1) * predicted_confidence / 100 # TODO have a customizable function to handle this + print( + f"Contract:{predictoor_contract.contract_address} - Submiting prediction for slot:{target_time}" + ) + predictoor_contract.submit_prediction( + predicted_value, stake_amount, target_time, True + ) + topics[address]["last_submited_epoch"] = epoch + return predicted_value + else: + print( + f"We do not submit, prediction function returned ({predicted_value}, {predicted_confidence})" + ) + + +def log_loop(blockno, model, main_pd): + global last_block_time + block = web3_config.w3.eth.get_block(blockno, full_transactions=False) + if block: + last_block_time = block["timestamp"] + prediction = process_block(block,model,main_pd) + if prediction is not None: + return prediction + +def main(): + print("Starting main loop...") + + ts_now=int( time.time() ) + + results_path = "results" + if not os.path.exists(results_path): + os.makedirs(results_path) + + results_csv_name="./"+results_path+'/'+exchange_id+"_"+models[0].pair+"_"+models[0].timeframe+"_"+str(ts_now)+".csv" + + columns_short = [ + "datetime", + "open", + "high", + "low", + "close", + "volume" + ] + + columns_models = [] + for model in models: + model.unpickle_model("./pdr_backend/predictoor/examples/models") + columns_models.append(model.model_name) # prediction column. 0 or 1 + + all_columns=columns_short+columns_models + + #write csv header for results + size = 0 + try: + files_stats=os.stat(results_csv_name) + size = files_stats.st_size + except: + pass + if size==0: + with open(results_csv_name, 'a') as f: + writer = csv.writer(f) + writer.writerow(all_columns) + + #read initial set of candles + candles = exchange_ccxt.fetch_ohlcv(pair, "5m") + #load past data + main_pd = pd.DataFrame(columns=all_columns) + for ohl in candles: + ohlc= { + 'timestamp':int(ohl[0]/1000), + 'open':float(ohl[1]), + 'close':float(ohl[4]), + 'low':float(ohl[3]), + 'high':float(ohl[2]), + 'volume':float(ohl[5]), + } + main_pd.loc[ohlc["timestamp"]]=ohlc + main_pd["datetime"] = pd.to_datetime(main_pd.index.values, unit="s", utc=True) + + lastblock = 0 + last_finalized_timestamp = 0 + while True: + + candles = exchange_ccxt.fetch_ohlcv(pair, "5m") + + #update last two candles + for ohl in candles[-2:]: + t = int(ohl[0]/1000) + main_pd.loc[t,['datetime']]=pd.to_datetime(t, unit="s", utc=True) + main_pd.loc[t,['open']]=float(ohl[1]) + main_pd.loc[t,['close']]=float(ohl[4]) + main_pd.loc[t,['low']]=float(ohl[3]) + main_pd.loc[t,['high']]=float(ohl[2]) + main_pd.loc[t,['volume']]=float(ohl[5]) + + timestamp = main_pd.index.values[-2] + + block = web3_config.w3.eth.block_number + if block > lastblock: + lastblock = block + + # #we have a new candle + if last_finalized_timestamp Date: Wed, 23 Aug 2023 09:29:12 +0200 Subject: [PATCH 2/4] make black happy --- .../predictoor/examples/models/main.py | 126 ++++++++++-------- .../predictoor/examples/models/predict.py | 6 +- pdr_backend/predictoor/predict.py | 2 +- 3 files changed, 74 insertions(+), 60 deletions(-) diff --git a/pdr_backend/predictoor/examples/models/main.py b/pdr_backend/predictoor/examples/models/main.py index 8c572e3a7..193603994 100644 --- a/pdr_backend/predictoor/examples/models/main.py +++ b/pdr_backend/predictoor/examples/models/main.py @@ -29,18 +29,17 @@ web3_config = Web3Config(rpc_url, private_key) owner = web3_config.owner -exchange_id = 'binance' -pair='BTC/TUSD' -timeframe='5m' +exchange_id = "binance" +pair = "BTC/TUSD" +timeframe = "5m" exchange_class = getattr(ccxt, exchange_id) -exchange_ccxt = exchange_class({ - 'timeout': 30000 -}) +exchange_ccxt = exchange_class({"timeout": 30000}) models = [ - OceanModel(exchange_id,pair,timeframe), + OceanModel(exchange_id, pair, timeframe), ] + def process_block(block, model, main_pd): global topics """ Process each contract and if needed, get a prediction, submit it and claim revenue for past epoch """ @@ -73,9 +72,7 @@ def process_block(block, model, main_pd): ) predictoor_contract.payout(slot, False) - if seconds_till_epoch_end <= int( - os.getenv("SECONDS_TILL_EPOCH_END", 60) - ): + if seconds_till_epoch_end <= int(os.getenv("SECONDS_TILL_EPOCH_END", 60)): """Timestamp of prediction""" target_time = (epoch + 2) * seconds_per_epoch @@ -85,7 +82,9 @@ def process_block(block, model, main_pd): ) if predicted_value is not None and predicted_confidence > 0: """We have a prediction, let's submit it""" - stake_amount = os.getenv("STAKE_AMOUNT", 1) * predicted_confidence / 100 # TODO have a customizable function to handle this + stake_amount = ( + os.getenv("STAKE_AMOUNT", 1) * predicted_confidence / 100 + ) # TODO have a customizable function to handle this print( f"Contract:{predictoor_contract.contract_address} - Submiting prediction for slot:{target_time}" ) @@ -105,80 +104,85 @@ def log_loop(blockno, model, main_pd): block = web3_config.w3.eth.get_block(blockno, full_transactions=False) if block: last_block_time = block["timestamp"] - prediction = process_block(block,model,main_pd) + prediction = process_block(block, model, main_pd) if prediction is not None: return prediction + def main(): print("Starting main loop...") - ts_now=int( time.time() ) + ts_now = int(time.time()) results_path = "results" if not os.path.exists(results_path): os.makedirs(results_path) - results_csv_name="./"+results_path+'/'+exchange_id+"_"+models[0].pair+"_"+models[0].timeframe+"_"+str(ts_now)+".csv" - - columns_short = [ - "datetime", - "open", - "high", - "low", - "close", - "volume" - ] + results_csv_name = ( + "./" + + results_path + + "/" + + exchange_id + + "_" + + models[0].pair + + "_" + + models[0].timeframe + + "_" + + str(ts_now) + + ".csv" + ) + + columns_short = ["datetime", "open", "high", "low", "close", "volume"] columns_models = [] for model in models: model.unpickle_model("./pdr_backend/predictoor/examples/models") - columns_models.append(model.model_name) # prediction column. 0 or 1 + columns_models.append(model.model_name) # prediction column. 0 or 1 - all_columns=columns_short+columns_models + all_columns = columns_short + columns_models - #write csv header for results + # write csv header for results size = 0 try: - files_stats=os.stat(results_csv_name) + files_stats = os.stat(results_csv_name) size = files_stats.st_size except: pass - if size==0: - with open(results_csv_name, 'a') as f: + if size == 0: + with open(results_csv_name, "a") as f: writer = csv.writer(f) writer.writerow(all_columns) - #read initial set of candles + # read initial set of candles candles = exchange_ccxt.fetch_ohlcv(pair, "5m") - #load past data + # load past data main_pd = pd.DataFrame(columns=all_columns) for ohl in candles: - ohlc= { - 'timestamp':int(ohl[0]/1000), - 'open':float(ohl[1]), - 'close':float(ohl[4]), - 'low':float(ohl[3]), - 'high':float(ohl[2]), - 'volume':float(ohl[5]), - } - main_pd.loc[ohlc["timestamp"]]=ohlc - main_pd["datetime"] = pd.to_datetime(main_pd.index.values, unit="s", utc=True) + ohlc = { + "timestamp": int(ohl[0] / 1000), + "open": float(ohl[1]), + "close": float(ohl[4]), + "low": float(ohl[3]), + "high": float(ohl[2]), + "volume": float(ohl[5]), + } + main_pd.loc[ohlc["timestamp"]] = ohlc + main_pd["datetime"] = pd.to_datetime(main_pd.index.values, unit="s", utc=True) lastblock = 0 last_finalized_timestamp = 0 while True: - candles = exchange_ccxt.fetch_ohlcv(pair, "5m") - #update last two candles + # update last two candles for ohl in candles[-2:]: - t = int(ohl[0]/1000) - main_pd.loc[t,['datetime']]=pd.to_datetime(t, unit="s", utc=True) - main_pd.loc[t,['open']]=float(ohl[1]) - main_pd.loc[t,['close']]=float(ohl[4]) - main_pd.loc[t,['low']]=float(ohl[3]) - main_pd.loc[t,['high']]=float(ohl[2]) - main_pd.loc[t,['volume']]=float(ohl[5]) + t = int(ohl[0] / 1000) + main_pd.loc[t, ["datetime"]] = pd.to_datetime(t, unit="s", utc=True) + main_pd.loc[t, ["open"]] = float(ohl[1]) + main_pd.loc[t, ["close"]] = float(ohl[4]) + main_pd.loc[t, ["low"]] = float(ohl[3]) + main_pd.loc[t, ["high"]] = float(ohl[2]) + main_pd.loc[t, ["volume"]] = float(ohl[5]) timestamp = main_pd.index.values[-2] @@ -187,7 +191,7 @@ def main(): lastblock = block # #we have a new candle - if last_finalized_timestamp Date: Wed, 23 Aug 2023 09:32:50 +0200 Subject: [PATCH 3/4] make pylint happy --- .pylintrc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index 7ae6c37a3..9baa275a7 100644 --- a/.pylintrc +++ b/.pylintrc @@ -34,7 +34,8 @@ ignore= LICENSE, docker-compose.yml, Dockerfile, - entrypoint.sh + entrypoint.sh, + pdr_backend.egg-info, # Add files or directories matching the regex patterns to the blacklist. The # regex matches against base names, not paths. From e9db8ec87f031ed0da55f6cc1ec8fc9348ae15d3 Mon Sep 17 00:00:00 2001 From: trentmc Date: Wed, 23 Aug 2023 10:09:02 +0200 Subject: [PATCH 4/4] make mypy happy --- mypy.ini | 35 ++++++++++++------- .../predictoor/examples/models/main.py | 3 +- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/mypy.ini b/mypy.ini index 52311f65a..19b3869b3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,47 +1,56 @@ [mypy] exclude = venv|codacy-analysis-cli-master -[mypy-eth_account.*] +[mypy-artifacts.*] ignore_missing_imports = True -[mypy-eth_keys.*] +[mypy-addresses.*] +ignore_missing_imports = True + +[mypy-ccxt.*] +ignore_missing_imports = True + +[mypy-ecies.*] ignore_missing_imports = True [mypy-enforce_typing.*] ignore_missing_imports = True +[mypy-eth_account.*] +ignore_missing_imports = True + +[mypy-eth_keys.*] +ignore_missing_imports = True + [mypy-matplotlib.*] ignore_missing_imports = True [mypy-numpy.*] ignore_missing_imports = True -[mypy-pylab.*] +[mypy-ocean_lib.*] ignore_missing_imports = True [mypy-pandas.*] ignore_missing_imports = True -[mypy-scipy.*] +[mypy-pylab.*] ignore_missing_imports = True -[mypy-ccxt.*] +[mypy-pytest] ignore_missing_imports = True -[mypy-ecies.*] +[mypy-scipy.*] ignore_missing_imports = True -[mypy-setuptools.*] +[mypy-sapphirepy.*] ignore_missing_imports = True -[mypy-artifacts.*] +[mypy-setuptools.*] ignore_missing_imports = True -[mypy-addresses.*] +[mypy-web3.*] ignore_missing_imports = True -[mypy-sapphirepy.*] +[mypy-pdr_backend.predictoor.examples.*] ignore_missing_imports = True - -[mypy-ocean_lib.*] -ignore_missing_imports = True \ No newline at end of file diff --git a/pdr_backend/predictoor/examples/models/main.py b/pdr_backend/predictoor/examples/models/main.py index 193603994..37364a84c 100644 --- a/pdr_backend/predictoor/examples/models/main.py +++ b/pdr_backend/predictoor/examples/models/main.py @@ -7,6 +7,7 @@ import threading from datetime import datetime, timedelta, timezone from threading import Thread +from typing import List from pdr_backend.predictoor.examples.models.pdr_model_simple.model import OceanModel from pdr_backend.predictoor.examples.models.predict import predict_function @@ -16,7 +17,7 @@ last_block_time = 0 -topics = [] +topics: List[dict] = [] rpc_url = env.get_rpc_url_or_exit() subgraph_url = env.get_subgraph_or_exit()