From c193c361eb6bce7f46b83ad667a121f3240ef2b0 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 5 Jan 2018 21:20:48 -0800 Subject: [PATCH 01/25] Abstract ethgasstation for microservice use Begins abstractions for the adaptive oracle to be run as a configurable microservice, and to not assume a specific architecture/configuration to be the case. --- .gitignore | 104 +++++++++++++++++++++ egs/__init__.py | 0 egs_ref.py => egs/egs_ref.py | 25 +++-- modelparams.py => egs/modelparams.py | 0 egs/settings.py | 39 ++++++++ ethgasstation.py | 133 +++++++++++++++++---------- model_gas.py | 38 ++++++-- settings.conf | 13 +++ 8 files changed, 289 insertions(+), 63 deletions(-) create mode 100644 .gitignore create mode 100644 egs/__init__.py rename egs_ref.py => egs/egs_ref.py (97%) rename modelparams.py => egs/modelparams.py (100%) create mode 100644 egs/settings.py create mode 100644 settings.conf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..af2f537 --- /dev/null +++ b/.gitignore @@ -0,0 +1,104 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/egs/__init__.py b/egs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/egs_ref.py b/egs/egs_ref.py similarity index 97% rename from egs_ref.py rename to egs/egs_ref.py index 442778b..9777d56 100755 --- a/egs_ref.py +++ b/egs/egs_ref.py @@ -1,8 +1,17 @@ -import pandas as pd -import numpy as np +""" +egs_ref.py + +Utility functions, MySQL Schemas, and other such architecture +for the EthGasStation adaptive oracle. +""" + import json -import urllib import time +import urllib + +import numpy as np +import pandas as pd + from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger from sqlalchemy.ext.declarative import declarative_base @@ -99,7 +108,7 @@ class Block_Data(Base): main = Column(Integer) block_number = Column(Integer) -class Timers(): +class Timers(object): """ class to keep track of time relative to network block also tracks low mined price from reports @@ -117,11 +126,11 @@ def update_time(self, block): def add_block(self, block_number, block_time): self.block_store[block_number] = block_time - + def read_block_time(self, block_number): return self.block_store.pop(block_number, None) -class CleanTx(): +class CleanTx(object): """transaction object / methods for pandas""" def __init__(self, tx_obj, block_posted=None, time_posted=None, miner=None): self.hash = tx_obj.hash @@ -153,7 +162,7 @@ def round_gp_10gwei(self): gp = 0 self.gp_10gwei = gp -class CleanBlock(): +class CleanBlock(object): """block object/methods for pandas""" def __init__(self, block_obj, main, uncle, timemined, mingasprice=None, numtx = None, weightedgp=None, includedblock=None): self.block_number = block_obj.number @@ -170,7 +179,7 @@ def __init__(self, block_obj, main, uncle, timemined, mingasprice=None, numtx = self.uncle = uncle self.includedblock = includedblock self.speed = self.gasused / self.gaslimit - + def to_dataframe(self): data = {0:{'block_number':self.block_number, 'gasused':self.gasused, 'miner':self.miner, 'gaslimit':self.gaslimit, 'numtx':self.numtx, 'blockhash':self.blockhash, 'time_mined':self.time_mined, 'mingasprice':self.mingasprice, 'uncsreported':self.uncsreported, 'blockfee':self.blockfee, 'main':self.main, 'uncle':self.uncle, 'speed':self.speed, 'includedblock':self.includedblock}} return pd.DataFrame.from_dict(data, orient='index') diff --git a/modelparams.py b/egs/modelparams.py similarity index 100% rename from modelparams.py rename to egs/modelparams.py diff --git a/egs/settings.py b/egs/settings.py new file mode 100644 index 0000000..66f4959 --- /dev/null +++ b/egs/settings.py @@ -0,0 +1,39 @@ +import configparser +import os + +parser_instance = None + +def load_settings(settings_file): + """Load settings from a settings file.""" + global parser_instance + + """Get settings from INI configuration file.""" + parser_instance = configparser.ConfigParser() + parser_instance.read(settings_file) + +def get_setting(section, name): + """Get a setting.""" + global parser_instance + + if section in parser_instance: + if name in parser_instance[section]: + print(parser_instance[section][name]) + return parser_instance[section][name] + raise KeyError("Could not find setting %s.%s in configuration." % (section, name)) + +def get_settings_filepath(curdir): + """Find a valid configuration file. + Order of priority: + 1. ./settings.conf + 2. /etc/ethgasstation/settings.conf + 3. /etc/ethgasstation.conf""" + ap = os.path.abspath(curdir) + if os.path.isfile(os.path.join(ap, 'settings.conf')): + return os.path.join(ap, 'settings.conf') + elif os.path.isdir('/etc'): + if os.path.isdir('/etc/ethgasstation') and \ + os.path.isfile('/etc/ethgasstation/settings.conf'): + return '/etc/ethgasstation/settings.conf' + elif os.path.isfile('/etc/ethgasstation.conf'): + return '/etc/ethgasstation.conf' + raise FileNotFoundError("Cannot find EthGasStation settings file.") diff --git a/ethgasstation.py b/ethgasstation.py index 2d3a9ed..5c9b952 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -1,26 +1,57 @@ -import time -import sys +#!/usr/bin/env python3 +""" +ethgasstation.py + +EthGasStation adaptive oracle. +For more information, see README.md. +""" + import json import math -import traceback import os -import pandas as pd +import sys +import time +import traceback + import numpy as np -from web3 import Web3, HTTPProvider +import pandas as pd from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger, text from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base -from egs_ref import * -import modelparams +from web3 import Web3, HTTPProvider -web3 = Web3(HTTPProvider('http://localhost:8545')) -engine = create_engine( - 'mysql+mysqlconnector://ethgas:station@127.0.0.1:3306/tx', echo=False) +# internal libraries +from egs.egs_ref import * +from egs.settings import load_settings, get_setting, get_settings_filepath +import egs.modelparams as modelparams + +# load settings from configuration +settings_file = get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) +load_settings(settings_file) + +# configure necessary services +web3 = Web3( + HTTPProvider( + "%s://%s:%s" % ( + get_setting('geth', 'protocol'), + get_setting('geth', 'hostname'), + get_setting('geth', 'port')))) + +connstr = "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( + get_setting('mysql', 'username'), + get_setting('mysql', 'password'), + get_setting('mysql', 'hostname'), + get_setting('mysql', 'port'), + get_setting('mysql', 'database') + ) +engine = create_engine(connstr, echo=False) + +# create tables, sql session Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() - + def init_dfs(): """load data from mysql""" @@ -66,7 +97,7 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt try: txpool_by_gp = txpool_by_gp.rename(columns={'gas_price':'count'}) txpool_by_gp['gasprice'] = txpool_by_gp['round_gp_10gwei']/10 - + prediction_table['gasprice'] = prediction_table['gasprice']/10 analyzed_block_show = analyzed_block.loc[analyzed_block['chained']==0].copy() analyzed_block_show['gasprice'] = analyzed_block_show['round_gp_10gwei']/10 @@ -74,6 +105,9 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt analyzed_blockout = analyzed_block_show.to_json(orient='records') prediction_tableout = prediction_table.to_json(orient='records') txpool_by_gpout = txpool_by_gp.to_json(orient='records') + + # TODO: abstract these filepaths out to a datastore + # The microservice should not assume filesystem information parentdir = os.path.dirname(os.getcwd()) filepath_gprecs = parentdir + '/json/ethgasAPI.json' filepath_txpool_gp = parentdir + '/json/memPool.json' @@ -90,17 +124,17 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt with open(filepath_analyzedblock, 'w') as outfile: outfile.write(analyzed_blockout) - + if not submitted_hourago.empty: submitted_hourago = submitted_hourago.to_json(orient='records') filepath_hourago = parentdir + '/json/hourago.json' with open(filepath_hourago, 'w') as outfile: outfile.write(submitted_hourago) - + except Exception as e: print(e) - -def get_txhases_from_txpool(block): + +def get_txhashes_from_txpool(block): """gets list of all txhash in txpool at block and returns dataframe""" hashlist = [] txpoolcontent = web3.txpool.content @@ -116,7 +150,7 @@ def process_block_transactions(block): """get tx data from block""" block_df = pd.DataFrame() block_obj = web3.eth.getBlock(block, True) - miner = block_obj.miner + miner = block_obj.miner for transaction in block_obj.transactions: clean_tx = CleanTx(transaction, None, None, miner) block_df = block_df.append(clean_tx.to_dataframe(), ignore_index = False) @@ -162,11 +196,11 @@ def check_10th(gasprice, gp_mined_10th): return x def check_5mago(gasprice, submitted_5mago): - + submitted_5mago.loc[(submitted_5mago['still_here'] >= 1) & (submitted_5mago['still_here'] <= 2) & (submitted_5mago['total'] < 4), 'pct_unmined'] = np.nan maxval = submitted_5mago.loc[submitted_5mago.index > gasprice, 'pct_unmined'].max() - + if gasprice in submitted_5mago.index: stillh = submitted_5mago.get_value(gasprice, 'still_here') if stillh > 2: @@ -175,7 +209,7 @@ def check_5mago(gasprice, submitted_5mago): rval = maxval else: rval = maxval - + if gasprice >= 1000: rval = 0 @@ -184,11 +218,11 @@ def check_5mago(gasprice, submitted_5mago): return maxval def check_hourago(gasprice, submitted_hourago): - + submitted_hourago.loc[(submitted_hourago['still_here'] >= 1) & (submitted_hourago['still_here'] <= 2) & (submitted_hourago['total'] <= 5), 'pct_unmined'] = np.nan maxval = submitted_hourago.loc[submitted_hourago.index > gasprice, 'pct_unmined'].max() - + if gasprice in submitted_hourago.index: stillh = submitted_hourago.get_value(gasprice, 'still_here') if stillh > 2: @@ -197,7 +231,7 @@ def check_hourago(gasprice, submitted_hourago): rval = maxval else: rval = maxval - + if gasprice >= 1000: rval = 0 @@ -210,7 +244,7 @@ def check_hourago(gasprice, submitted_hourago): def predict(row): if row['chained'] == 1: return np.nan - + #set in modelparams try: sum1 = (modelparams.INTERCEPT + (row['hashpower_accepting'] * modelparams.HPA_COEF) + (row['tx_atabove'] * modelparams.TXATABOVE_COEF) + (row['hgXhpa'] * modelparams.INTERACT_COEF) + (row['highgas2'] * modelparams.HIGHGAS_COEF)) @@ -269,7 +303,7 @@ def analyze_txpool(block, txpool, alltx, hashpower, avg_timemined, gaslimit, gp_ #merge transaction data for txpool transactions #txpool_block only has transactions received by filter txpool_block = txpool_block.join(alltx, how='inner') - + #txpool_block = txpool_block[~txpool_block.index.duplicated(keep = 'first')] assert txpool_block.index.duplicated(keep='first').sum() == 0 @@ -277,12 +311,12 @@ def analyze_txpool(block, txpool, alltx, hashpower, avg_timemined, gaslimit, gp_ txpool_block['num_to'] = txpool_block.groupby('to_address')['block_posted'].transform('count') txpool_block['ico'] = (txpool_block['num_to'] > 90).astype(int) txpool_block['dump'] = (txpool_block['num_from'] > 5).astype(int) - + #new dfs grouped by gasprice and nonce txpool_by_gp = txpool_block[['gas_price', 'round_gp_10gwei']].groupby('round_gp_10gwei').agg({'gas_price':'count'}) txpool_block_nonce = txpool_block[['from_address', 'nonce']].groupby('from_address').agg({'nonce':'min'}) - #when multiple tx from same from address, finds tx with lowest nonce (unchained) - others are 'chained' + #when multiple tx from same from address, finds tx with lowest nonce (unchained) - others are 'chained' txpool_block['chained'] = txpool_block.apply(check_nonce, args=(txpool_block_nonce,), axis=1) #predictiontable @@ -342,7 +376,7 @@ def analyze_txpool(block, txpool, alltx, hashpower, avg_timemined, gaslimit, gp_ return(txpool_block, txpool_by_gp, predictTable) def get_gasprice_recs(prediction_table, block_time, block, speed, minlow=-1, submitted_hourago=None): - + def get_safelow(minlow, submitted_hourago): series = prediction_table.loc[prediction_table['expectedTime'] <= 30, 'gasprice'] safelow = series.min() @@ -352,7 +386,7 @@ def get_safelow(minlow, submitted_hourago): if minlow >= 0: if safelow < minlow: safelow = minlow - + return float(safelow) def get_average(safelow): @@ -384,7 +418,7 @@ def get_fastest(): minhash_list = prediction_table.loc[prediction_table['hashpower_accepting']>95, 'gasprice'] if fastest < minhash_list.min(): fastest = minhash_list.min() - return float(fastest) + return float(fastest) def get_wait(gasprice): try: @@ -393,7 +427,7 @@ def get_wait(gasprice): wait = 0 wait = round(wait, 1) return float(wait) - + gprecs = {} gprecs['safeLow'] = get_safelow(minlow, submitted_hourago) gprecs['safeLowWait'] = get_wait(gprecs['safeLow']) @@ -418,16 +452,16 @@ def master_control(): snapstore = pd.DataFrame() print ('blocks '+ str(len(blockdata))) print ('txcount '+ str(len(alltx))) - timer = Timers(web3.eth.blockNumber) + timer = Timers(web3.eth.blockNumber) start_time = time.time() tx_filter = web3.eth.filter('pending') - + def append_new_tx(clean_tx): nonlocal alltx if not clean_tx.hash in alltx.index: alltx = alltx.append(clean_tx.to_dataframe(), ignore_index = False) - + def update_dataframes(block): @@ -451,11 +485,11 @@ def update_dataframes(block): #process block data block_sumdf = process_block_data(mined_blockdf, block_obj) - #add block data to block dataframe + #add block data to block dataframe blockdata = blockdata.append(block_sumdf, ignore_index = True) - #get list of txhashes from txpool - current_txpool = get_txhases_from_txpool(block) + #get list of txhashes from txpool + current_txpool = get_txhashes_from_txpool(block) #add txhashes to txpool dataframe txpool = txpool.append(current_txpool, ignore_index = False) @@ -497,7 +531,7 @@ def update_dataframes(block): assert analyzed_block.index.duplicated().sum()==0 alltx = alltx.combine_first(analyzed_block) - + #with pd.option_context('display.max_columns', None,): #print(analyzed_block) @@ -518,10 +552,10 @@ def update_dataframes(block): (blockdata, alltx, txpool) = prune_data(blockdata, alltx, txpool, block) return True - except: - print(traceback.format_exc()) + except: + print(traceback.format_exc()) + - while True: try: new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) @@ -531,23 +565,26 @@ def update_dataframes(block): block = web3.eth.blockNumber timestamp = time.time() if (timer.process_block > (block - 5)): - for new_tx in new_tx_list: - try: + for new_tx in new_tx_list: + try: tx_obj = web3.eth.getTransaction(new_tx) clean_tx = CleanTx(tx_obj, block, timestamp) append_new_tx(clean_tx) except Exception as e: pass if (timer.process_block < block): - + if block > timer.start_block+1: print('current block ' +str(block)) print ('processing block ' + str(timer.process_block)) updated = update_dataframes(timer.process_block) print ('finished ' + str(timer.process_block)) timer.process_block = timer.process_block + 1 - - - -master_control() + +def main(): + """int main""" + master_control() + +if __name__ == "__main__": + main() diff --git a/model_gas.py b/model_gas.py index 467dc53..1a7cb5c 100755 --- a/model_gas.py +++ b/model_gas.py @@ -1,20 +1,44 @@ #analysis: Run poission regression models +import json +import math +import os +import re +import sys +import subprocess +import urllib + import mysql.connector -import pandas as pd import numpy as np +import pandas as pd + import statsmodels.api as sm -import math -import sys -import os, subprocess, re -import urllib,json + from sqlalchemy import create_engine from patsy import dmatrices -cnx = mysql.connector.connect(user='ethgas', password='station', host='127.0.0.1', database='tx') + +from egs.settings import get_settings_filepath, load_settings, get_setting + +settings_file = get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) +load_settings(settings_file) + +# TODO: choose one mysql or the other +cnx = mysql.connector.connect( + user=get_setting('mysql', 'username'), + password=get_setting('mysql', 'password'), + host=get_setting('mysql', 'hostname'), + port=get_setting('mysql', 'port'), + database=get_setting('mysql', 'database')) cursor = cnx.cursor() engine = create_engine( - 'mysql+mysqlconnector://ethgas:station@127.0.0.1:3306/tx', echo=False) + "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( + get_setting('mysql', 'username'), + get_setting('mysql', 'password'), + get_setting('mysql', 'hostname'), + get_setting('mysql', 'port'), + get_setting('mysql', 'database') + ), echo=False) query = ("SELECT * FROM minedtx2") cursor.execute(query) diff --git a/settings.conf b/settings.conf new file mode 100644 index 0000000..607e99a --- /dev/null +++ b/settings.conf @@ -0,0 +1,13 @@ +# EthGasStation config + +[mysql] + hostname = 127.0.0.1 + port = 3306 + username = ethgas + password = station + database = tx + +[geth] + hostname = localhost + port = 8545 + protocol = http \ No newline at end of file From e3a12795b9bbdc7225dc9da2df055d204965d1f2 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 5 Jan 2018 21:39:05 -0800 Subject: [PATCH 02/25] Add JSON abstraction, update README JSON files may now be saved to arbitrary directories. --- README.md | 33 ++++++++++++++++++++++++++------- ethgasstation.py | 9 +++++++++ settings.conf | 5 ++++- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index afc7952..56bf989 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,33 @@ # ethgasstation -Adaptive Gas Price Oracle for Ethereum Blockchain +#### an adaptive gas price oracle for the ethereum blockchain -This python script is designed to monitor a local ethereum node (geth, does not work right now with Parity). It will record data about pending and mined transactions, including the transactions in your nodes transaction pool. Its main purpose is to generate adaptive gas price estimates that enable you to know what gas price to use depending on your confirmation time needs. It generates these estimates based on miner policy estimates as well as the number of transactions in the txpool and the gas offered by the transaction. +This is the backend for [ethgasstation](https://ethgasstation.info), written in +Python 3. This python script is designed to monitor a local Geth node. It will +record data about pending and mined transactions, including the transactions in +your node's transaction pool. Its main purpose is to generate adaptive gas price +estimates that enable you to know what gas price to use depending on your +confirmation time needs. It generates these estimates based on miner policy +estimates as well as the number of transactions in the txpool and the gas +offered by the transaction. -It also stores transaction data in a mysql database. Create a user named 'ethgas' password 'station' and add an empty database 'tx'. This allows you to run the model_gas.py script to reestimate the regression model used to generate the predictions. +The basic strategy is to use statistical modelling to predict confirmation times +at all gas prices from 0-100 gwei at the current state of the txpool and minimum +gas prices accepted in blocks over the last 200 blocks. Then, it selects the +gas price that gives the desired confirmation time assuming standard gas offered +(higher than 1m gas is slower). -The basic strategy is to use statistical modelling to predict confirmation times at all gas prices from 0-100 gwei at the current state of the txpool and minimum gas prices accepted in blocks over the last 200 blocks. Then, it selects the gas price that gives the desired confirmation time assuming standard gas offered (higher than 1m gas is slower). +### Installation and Prerequisites -Note: you need to create a folder 'json' in the parent directory and then a new json file will be written to the folder each block containing the gas price predictions and the prediction table. The gas price in ethgasAPI is currently in 10gwei units, so divide them by 10 to get in gwei. +ethgasstation requires **Python 3**, **MySQL/MariaDB**, and **Geth**. You will +need to modify `settings.conf` for your specific environment; some (insecure) +defaults are set to get you up and running. -usage: `python3 ethgasstation.py` +The oracle outputs JSON files. These files are stored in the output +directory specified by the `settings.conf` file. In the future the Oracle +will support saving JSON blobs to cloud object storage, Redis, MySQL, etc. -requirements: `pip3 install -r requirements.txt` + +### Usage + +1. Install requirements using `pip3 install -r requirements.txt` +2. Run `./ethgasstation.py` or `python3 ethgasstation.py`. diff --git a/ethgasstation.py b/ethgasstation.py index 5c9b952..9a5c0e3 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -29,6 +29,15 @@ settings_file = get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) load_settings(settings_file) +json_output_dir = os.path.abspath(get_setting('json', 'output_directory')) +if not os.path.isdir(json_output_dir): + # XXX should be on stderr + print("WARN: Could not find output directory %s" % json_output_dir) + print("WARN: Making directory tree.") + # attempt to make dirs + os.makedirs(json_output_dir, exist_ok=True) +sys.exit(0) + # configure necessary services web3 = Web3( HTTPProvider( diff --git a/settings.conf b/settings.conf index 607e99a..4e59ad9 100644 --- a/settings.conf +++ b/settings.conf @@ -10,4 +10,7 @@ [geth] hostname = localhost port = 8545 - protocol = http \ No newline at end of file + protocol = http + +[json] + output_directory = ./json \ No newline at end of file From ee99571d60e63fc1757c8415d0b189bfe245622e Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 5 Jan 2018 21:42:54 -0800 Subject: [PATCH 03/25] Actually use JSON abstraction --- ethgasstation.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/ethgasstation.py b/ethgasstation.py index 9a5c0e3..725f198 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -103,6 +103,8 @@ def write_to_sql(alltx, analyzed_block, block_sumdf, mined_blockdf, block): def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitted_hourago=None): """write json data""" + global json_output_dir + try: txpool_by_gp = txpool_by_gp.rename(columns={'gas_price':'count'}) txpool_by_gp['gasprice'] = txpool_by_gp['round_gp_10gwei']/10 @@ -115,13 +117,11 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt prediction_tableout = prediction_table.to_json(orient='records') txpool_by_gpout = txpool_by_gp.to_json(orient='records') - # TODO: abstract these filepaths out to a datastore - # The microservice should not assume filesystem information - parentdir = os.path.dirname(os.getcwd()) - filepath_gprecs = parentdir + '/json/ethgasAPI.json' - filepath_txpool_gp = parentdir + '/json/memPool.json' - filepath_prediction_table = parentdir + '/json/predictTable.json' - filepath_analyzedblock = parentdir + '/json/txpoolblock.json' + filepath_gprecs = os.path.join(json_output_dir, 'ethgasAPI.json') + filepath_txpool_gp = os.path.join(json_output_dir, 'memPool.json') + filepath_prediction_table = os.path.join(json_output_dir, 'predictTable.json') + filepath_analyzedblock = os.path.join(json_output_dir, 'txpoolblock.json') + with open(filepath_gprecs, 'w') as outfile: json.dump(gprecs, outfile) @@ -136,10 +136,11 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt if not submitted_hourago.empty: submitted_hourago = submitted_hourago.to_json(orient='records') - filepath_hourago = parentdir + '/json/hourago.json' + filepath_hourago = os.path.join(json_output_dir, '/hourago.json') with open(filepath_hourago, 'w') as outfile: outfile.write(submitted_hourago) + except Exception as e: print(e) From 59ff11867330d7fd19f29e839387ed9812fbdc60 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 13:39:47 -0800 Subject: [PATCH 04/25] port egs's new fixes to develop branch --- egs/egs_ref.py | 4 ++-- ethgasstation.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/egs/egs_ref.py b/egs/egs_ref.py index 9777d56..76d1ee8 100755 --- a/egs/egs_ref.py +++ b/egs/egs_ref.py @@ -153,10 +153,10 @@ def round_gp_10gwei(self): """Rounds the gas price to gwei""" gp = self.gas_price/1e8 if gp >= 1 and gp < 10: - gp = np.floor(gp) + gp = np.ceil(gp) elif gp >= 10: gp = gp/10 - gp = np.floor(gp) + gp = np.ceil(gp) gp = gp*10 else: gp = 0 diff --git a/ethgasstation.py b/ethgasstation.py index 725f198..51e8697 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -574,7 +574,7 @@ def update_dataframes(block): new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) block = web3.eth.blockNumber timestamp = time.time() - if (timer.process_block > (block - 5)): + if (timer.process_block > (block - 8)): for new_tx in new_tx_list: try: tx_obj = web3.eth.getTransaction(new_tx) From 3ffa77aefe5eb76f6145a5f37853c2d5c6c8e9b9 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 15:28:53 -0800 Subject: [PATCH 05/25] Abstract JSON exportation to file, redis datastores --- egs/jsonexporter.py | 118 ++++++++++++++++++++++++++++++++++++++++++++ egs/settings.py | 7 +++ ethgasstation.py | 32 ++++-------- requirements.txt | 1 + settings.conf | 3 +- 5 files changed, 137 insertions(+), 24 deletions(-) create mode 100644 egs/jsonexporter.py diff --git a/egs/jsonexporter.py b/egs/jsonexporter.py new file mode 100644 index 0000000..19338aa --- /dev/null +++ b/egs/jsonexporter.py @@ -0,0 +1,118 @@ +""" +JSON Exporter. + +Will export JSON to files or K/V store for microservices +to pick up and use. Used to generate the JSON data used by the EGS +v0 API, and similar legacy services. +""" + +import json +import os +from urllib.parse import urlparse + +import redis +from .settings import settings_file_loaded, get_setting + +class JSONExporter(object): + """JSON exporter main class. Allows for export of JSON strings to various + backing data stores.""" + + redis_key_prefix = "" + supported_types = ['file', 'redis'] + + redis = None + + export_type = None + export_location = None + + + def __init__(self, export_type=None, export_location=None): + if export_type is None: + self._get_export_options_from_settings() + else: + self._check_export_type(export_type) + self.export_type = export_type + self.export_location = export_location + + def write_json(self, key, object_or_str): + """Writes JSON to supported endpoint.""" + if self.export_type == 'file': + self._write_json_file(key, object_or_str) + elif self.export_type == 'redis': + self._write_json_redis(key, object_or_str) + + def _write_json_file(self, key, object_or_str): + """Writes JSON to filesystem.""" + if not os.path.isdir(self.export_location): + raise JSONExporterException( + "Cannot write to output dir %s, doesn't exist." % + (self.export_location)) + + json_str = self._serialize(object_or_str) + output_path = os.path.join(self.export_location, "%s.json" % (key)) + with open(output_path, 'w') as fd: + fd.write(json_str) + + def _write_json_redis(self, key, object_or_str): + """Writes JSON to Redis store.""" + # self.export_location should be parseable + conn = self._connect_redis() + key = "%s_%s" % (self.redis_key_prefix, key) + json_str = self._serialize(object_or_str) + conn.set(key, json_str) + + def _check_export_type(self, export_type): + """Checks for a valid export type. Raises Error if not found.""" + if not export_type in self.supported_types: + raise JSONExporterException("JSONExporter does not support type %s" % export_type) + + def _serialize(self, mixed): + """Serializes mixed to JSON.""" + if isinstance(mixed, str): + # serialize to validate is JSON + mixed = json.loads(mixed) + return json.dumps(mixed) + + def _connect_redis(self, force_reconnect=False): + """Connect to redis. Saves connection as self.redis.""" + if self.redis is None or force_reconnect is True: + loc = urlparse(self.export_location) + if loc.scheme == 'unix': + unix_socket_path = loc.path + conn = redis.Redis(unix_socket_path=unix_socket_path) + else: + hostname = loc.hostname + port = loc.port + if port is None: + port = 6379 # default redis port + if loc.password is None: + conn = redis.Redis(host=hostname, port=int(port)) + else: + conn = redis.Redis(host=hostname, port=int(port), + password=loc.password) + self.redis = conn + return self.redis + + def _get_export_options_from_settings(self): + """Get export options from default settings.""" + if settings_file_loaded() is False: + raise JSONExporterException("JSONExporter can't get settings.") + export_type = get_setting("json", "output_type") + self._check_export_type(export_type) + self.export_type = export_type + export_location = get_setting("json", "output_location") + if export_type == 'file': + export_location = os.path.abspath(export_location) + if not os.path.isdir(export_location): + # XXX should be on stderr + print("WARN: Could not find output directory %s" % export_location) + print("WARN: Making directory tree.") + # attempt to make dirs + os.makedirs(export_location, exist_ok=True) + + self.export_location = export_location + + +class JSONExporterException(Exception): + """Generic exception for JSON Exporter.""" + pass diff --git a/egs/settings.py b/egs/settings.py index 66f4959..0e85d3c 100644 --- a/egs/settings.py +++ b/egs/settings.py @@ -2,14 +2,21 @@ import os parser_instance = None +settings_loaded = False + +def settings_file_loaded(): + global settings_loaded + return settings_loaded is True def load_settings(settings_file): """Load settings from a settings file.""" global parser_instance + global settings_loaded """Get settings from INI configuration file.""" parser_instance = configparser.ConfigParser() parser_instance.read(settings_file) + settings_loaded = True def get_setting(section, name): """Get a setting.""" diff --git a/ethgasstation.py b/ethgasstation.py index 51e8697..bd20ea5 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -23,6 +23,8 @@ # internal libraries from egs.egs_ref import * from egs.settings import load_settings, get_setting, get_settings_filepath +from egs.jsonexporter import JSONExporter + import egs.modelparams as modelparams # load settings from configuration @@ -36,7 +38,6 @@ print("WARN: Making directory tree.") # attempt to make dirs os.makedirs(json_output_dir, exist_ok=True) -sys.exit(0) # configure necessary services web3 = Web3( @@ -100,10 +101,9 @@ def write_to_sql(alltx, analyzed_block, block_sumdf, mined_blockdf, block): analyzed_block.to_sql(con=engine, name='txpool_current', index=False, if_exists='replace') block_sumdf.to_sql(con=engine, name='blockdata2', if_exists='append', index=False) - def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitted_hourago=None): """write json data""" - global json_output_dir + exporter = JSONExporter() try: txpool_by_gp = txpool_by_gp.rename(columns={'gas_price':'count'}) @@ -117,29 +117,14 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt prediction_tableout = prediction_table.to_json(orient='records') txpool_by_gpout = txpool_by_gp.to_json(orient='records') - filepath_gprecs = os.path.join(json_output_dir, 'ethgasAPI.json') - filepath_txpool_gp = os.path.join(json_output_dir, 'memPool.json') - filepath_prediction_table = os.path.join(json_output_dir, 'predictTable.json') - filepath_analyzedblock = os.path.join(json_output_dir, 'txpoolblock.json') - - with open(filepath_gprecs, 'w') as outfile: - json.dump(gprecs, outfile) - - with open(filepath_prediction_table, 'w') as outfile: - outfile.write(prediction_tableout) - - with open(filepath_txpool_gp, 'w') as outfile: - outfile.write(txpool_by_gpout) - - with open(filepath_analyzedblock, 'w') as outfile: - outfile.write(analyzed_blockout) + exporter.write_json('ethgasAPI', gprecs) + exporter.write_json('memPool', txpool_by_gpout) + exporter.write_json('predictTable', prediction_tableout) + exporter.write_json('txpoolblock', analyzed_blockout) if not submitted_hourago.empty: submitted_hourago = submitted_hourago.to_json(orient='records') - filepath_hourago = os.path.join(json_output_dir, '/hourago.json') - with open(filepath_hourago, 'w') as outfile: - outfile.write(submitted_hourago) - + exporter.write_json('hourago', submitted_hourago)= except Exception as e: print(e) @@ -296,6 +281,7 @@ def analyze_last200blocks(block, blockdata): if np.isnan(avg_timemined): avg_timemined = 30 return(hashpower, avg_timemined, gaslimit, speed) + def analyze_last5blocks(block, alltx): recent_blocks= alltx.loc[(alltx['block_mined'] >= (block-5)) & (alltx['block_mined'] <= (block))] gp_mined_10th = recent_blocks['gas_price'].quantile(.1) diff --git a/requirements.txt b/requirements.txt index f8f3bb6..5ba2885 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ pylru==1.0.9 pysha3==1.0.2 python-dateutil==2.6.1 pytz==2017.3 +redis==2.10.6 requests==2.18.4 rlp==0.6.0 scipy==1.0.0 diff --git a/settings.conf b/settings.conf index 4e59ad9..43633d2 100644 --- a/settings.conf +++ b/settings.conf @@ -13,4 +13,5 @@ protocol = http [json] - output_directory = ./json \ No newline at end of file + output_type = file + output_location = ./json From da5a6616386bd56528af3b32b8e3bb352757ce1b Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 15:33:57 -0800 Subject: [PATCH 06/25] Update readme to show JSON export options --- README.md | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 56bf989..22bbeb7 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,26 @@ need to modify `settings.conf` for your specific environment; some (insecure) defaults are set to get you up and running. The oracle outputs JSON files. These files are stored in the output -directory specified by the `settings.conf` file. In the future the Oracle -will support saving JSON blobs to cloud object storage, Redis, MySQL, etc. - +directory specified by the `settings.conf` file. You may output these JSON +strings to files by setting `json.output_type` to `file` and +`json.output_location` to a filepath, such as: + +``` +[json] + output_type = file + output_location = ./json +``` + +or you may set `json.output_type` to Redis and give a redis connection string: + +``` +[json] + output_type = redis + output_location = http://localhost:6379 +``` + +Redis password authentication is also supported by adding it to the output +location string, e.g. `http://:password@localhost:6379/`. ### Usage From b39b37d1a1b34a2ece3dedb9644691bc7cdf6e5e Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 22:08:53 -0800 Subject: [PATCH 07/25] Add Dockerfile, docker settings for docker-compose --- Dockerfile | 12 ++++++++++++ ethgasstation.py | 2 +- settings.docker.conf | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 Dockerfile create mode 100644 settings.docker.conf diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..eb98bbe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM ubuntu:xenial +RUN apt-get update +RUN apt-get install -y python3 python3-pip + +ADD settings.docker.conf /etc/ethgasstation.conf +ADD ethgasstation.py /opt/ethgasstation/ethgasstation.py +ADD model_gas.py /opt/ethgasstation/model_gas.py +ADD requirements.txt /opt/ethgasstation/requirements.txt +ADD egs/ /opt/ethgasstation/egs/ +RUN pip3 install -r /opt/ethgasstation/requirements.txt + +CMD /usr/bin/python3 /opt/ethgasstation/ethgasstation.py diff --git a/ethgasstation.py b/ethgasstation.py index bd20ea5..e9b7b29 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -124,7 +124,7 @@ def write_to_json(gprecs, txpool_by_gp, prediction_table, analyzed_block,submitt if not submitted_hourago.empty: submitted_hourago = submitted_hourago.to_json(orient='records') - exporter.write_json('hourago', submitted_hourago)= + exporter.write_json('hourago', submitted_hourago) except Exception as e: print(e) diff --git a/settings.docker.conf b/settings.docker.conf new file mode 100644 index 0000000..e0e2160 --- /dev/null +++ b/settings.docker.conf @@ -0,0 +1,17 @@ +# EthGasStation config + +[mysql] + hostname = mariadb + port = 3306 + username = ethgas + password = station + database = tx + +[geth] + hostname = geth + port = 8545 + protocol = http + +[json] + output_type = redis + output_location = http://redis:6379/ \ No newline at end of file From 0867adaa09f26c71b93879377a80b3ad066b2c74 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 22:11:47 -0800 Subject: [PATCH 08/25] clean up this old cruft --- ethgasstation.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ethgasstation.py b/ethgasstation.py index e9b7b29..4a7293a 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -31,14 +31,6 @@ settings_file = get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) load_settings(settings_file) -json_output_dir = os.path.abspath(get_setting('json', 'output_directory')) -if not os.path.isdir(json_output_dir): - # XXX should be on stderr - print("WARN: Could not find output directory %s" % json_output_dir) - print("WARN: Making directory tree.") - # attempt to make dirs - os.makedirs(json_output_dir, exist_ok=True) - # configure necessary services web3 = Web3( HTTPProvider( From 090464e085ed7c2721c88ad4fcf8c0af17de0296 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 22:21:29 -0800 Subject: [PATCH 09/25] Update README to reflect Dockerisation --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 22bbeb7..7ae3ebc 100644 --- a/README.md +++ b/README.md @@ -46,5 +46,17 @@ location string, e.g. `http://:password@localhost:6379/`. ### Usage +To run the script as is on bare metal or a VM, manually: + 1. Install requirements using `pip3 install -r requirements.txt` 2. Run `./ethgasstation.py` or `python3 ethgasstation.py`. + +It is also possible to run the oracle as a Docker container. + +1. Change the settings in settings.docker.conf. +2. Run `docker build -t ethgasstation-backend .` from this directory. +3. Run `docker run ethgasstation-backend:latest`. + +In the Docker service, the Python script will dump data to JSON on Redis. +You will need to update your infrastructure to the internal hostnames +available for MariaDB, Redis, and geth, respectively. From 922555418aafd5fe599d2df2146d68335de7c3bd Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 6 Jan 2018 22:24:57 -0800 Subject: [PATCH 10/25] i cant english --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ae3ebc..c4f40e1 100644 --- a/README.md +++ b/README.md @@ -58,5 +58,6 @@ It is also possible to run the oracle as a Docker container. 3. Run `docker run ethgasstation-backend:latest`. In the Docker service, the Python script will dump data to JSON on Redis. -You will need to update your infrastructure to the internal hostnames -available for MariaDB, Redis, and geth, respectively. +You will need to update your settings.conf to the internal hostnames +available for MariaDB, Redis, and geth, respectively within your +infrastructure. From 97740a3e164c88eb32ad470f9f67fad78c9f4992 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 7 Jan 2018 01:09:51 -0800 Subject: [PATCH 11/25] remove this print statement --- egs/settings.py | 1 - 1 file changed, 1 deletion(-) diff --git a/egs/settings.py b/egs/settings.py index 0e85d3c..a69763d 100644 --- a/egs/settings.py +++ b/egs/settings.py @@ -24,7 +24,6 @@ def get_setting(section, name): if section in parser_instance: if name in parser_instance[section]: - print(parser_instance[section][name]) return parser_instance[section][name] raise KeyError("Could not find setting %s.%s in configuration." % (section, name)) From 1ba248e9b4b641e782ad4428019a17e8c019357e Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 7 Jan 2018 12:58:24 -0800 Subject: [PATCH 12/25] normalize directory search between libraries --- egs/settings.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/egs/settings.py b/egs/settings.py index a69763d..0e7cdfa 100644 --- a/egs/settings.py +++ b/egs/settings.py @@ -30,16 +30,19 @@ def get_setting(section, name): def get_settings_filepath(curdir): """Find a valid configuration file. Order of priority: - 1. ./settings.conf - 2. /etc/ethgasstation/settings.conf - 3. /etc/ethgasstation.conf""" - ap = os.path.abspath(curdir) - if os.path.isfile(os.path.join(ap, 'settings.conf')): - return os.path.join(ap, 'settings.conf') - elif os.path.isdir('/etc'): - if os.path.isdir('/etc/ethgasstation') and \ - os.path.isfile('/etc/ethgasstation/settings.conf'): - return '/etc/ethgasstation/settings.conf' - elif os.path.isfile('/etc/ethgasstation.conf'): - return '/etc/ethgasstation.conf' + '/etc/ethgasstation/settings.conf' + '/etc/ethgasstation.conf' + '/etc/default/ethgasstation.conf' + '/opt/ethgasstation/settings.conf' + """ + default_ini_locations = [ + '/etc/ethgasstation/settings.conf', + '/etc/ethgasstation.conf', + '/etc/default/ethgasstation.conf', + '/opt/ethgasstation/settings.conf' + ] + + for candidate_location in default_ini_locations: + if os.path.isfile(candidate_location): + return candidate_location raise FileNotFoundError("Cannot find EthGasStation settings file.") From 1f225e239c24fb05ab4406893075a4a4f88bda64 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 14 Jan 2018 21:33:53 -0800 Subject: [PATCH 13/25] Add new EGS code, refactor for modularity --- egs/settings.py | 23 +++++ gasStationFull.py => ethgasstation.py | 119 ++++++++++++-------------- model_gas.py | 17 +--- per_block_analysis.py | 27 +++--- report_generator.py | 13 +-- 5 files changed, 102 insertions(+), 97 deletions(-) rename gasStationFull.py => ethgasstation.py (87%) diff --git a/egs/settings.py b/egs/settings.py index 0e7cdfa..75599f9 100644 --- a/egs/settings.py +++ b/egs/settings.py @@ -1,6 +1,8 @@ import configparser import os +from web3 import Web3, HTTPProvider + parser_instance = None settings_loaded = False @@ -46,3 +48,24 @@ def get_settings_filepath(curdir): if os.path.isfile(candidate_location): return candidate_location raise FileNotFoundError("Cannot find EthGasStation settings file.") + +def get_web3_provider(): + """Get Web3 instance.""" + web3 = Web3( + HTTPProvider( + "%s://%s:%s" % ( + get_setting('geth', 'protocol'), + get_setting('geth', 'hostname'), + get_setting('geth', 'port')))) + return web3 + +def get_mysql_connstr(): + """Get a MySQL connection string for SQLAlchemy.""" + connstr = "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( + get_setting('mysql', 'username'), + get_setting('mysql', 'password'), + get_setting('mysql', 'hostname'), + get_setting('mysql', 'port'), + get_setting('mysql', 'database') + ) + return connstr diff --git a/gasStationFull.py b/ethgasstation.py similarity index 87% rename from gasStationFull.py rename to ethgasstation.py index 5b78e63..881782c 100755 --- a/gasStationFull.py +++ b/ethgasstation.py @@ -1,3 +1,9 @@ +#!/usr/bin/env python3 +""" + ETH Gas Station + Primary backend. +""" + import time import sys import json @@ -7,35 +13,30 @@ import random import pandas as pd import numpy as np -from web3 import Web3, HTTPProvider + +import egs.settings + from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger, text from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base -from egs import * + +from egs.egs_ref import * +from egs.jsonexporter import JSONExporter, JSONExporterException from per_block_analysis import * from report_generator import * # configure necessary services -web3 = Web3( - HTTPProvider( - "%s://%s:%s" % ( - get_setting('geth', 'protocol'), - get_setting('geth', 'hostname'), - get_setting('geth', 'port')))) - -connstr = "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( - get_setting('mysql', 'username'), - get_setting('mysql', 'password'), - get_setting('mysql', 'hostname'), - get_setting('mysql', 'port'), - get_setting('mysql', 'database') - ) +settings_file = egs.settings.get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) +egs.settings.load_settings(settings_file) + +exporter = JSONExporter() +web3 = egs.settings.get_web3_provider() +connstr = egs.settings.get_mysql_connstr() engine = create_engine(connstr, echo=False) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() - def init_dfs(): """load data from mysql""" @@ -76,6 +77,8 @@ def write_to_sql(alltx, block_sumdf, mined_blockdf, block): def write_report(report, top_miners, price_wait, miner_txdata, gasguzz, lowprice): """write json data""" + global exporter + parentdir = os.path.dirname(os.getcwd()) top_minersout = top_miners.to_json(orient='records') minerout = miner_txdata.to_json(orient='records') @@ -90,61 +93,47 @@ def write_report(report, top_miners, price_wait, miner_txdata, gasguzz, lowprice filepath_lowpriceout = parentdir + '/json/validated.json' try: - with open(filepath_report, 'w') as outfile: - json.dump(report, outfile, allow_nan=False) - with open(filepath_tminers, 'w') as outfile: - outfile.write(top_minersout) - with open(filepath_pwait, 'w') as outfile: - outfile.write(price_waitout) - with open(filepath_minerout, 'w') as outfile: - outfile.write(minerout) - with open(filepath_gasguzzout, 'w') as outfile: - outfile.write(gasguzzout) - with open(filepath_lowpriceout, 'w') as outfile: - outfile.write(lowpriceout) - + exporter.write_json('txDataLast10k', report) + exporter.write_json('topMiners', top_minersout) + exporter.write_json('priceWait', price_waitout) + exporter.write_json('miners', minerout) + exporter.write_json('gasguzz', gasguzzout) + exporter.write_json('validated', lowpriceout) except Exception as e: print(e) def write_to_json(gprecs, prediction_table=pd.DataFrame()): """write json data""" + global exporter try: parentdir = os.path.dirname(os.getcwd()) if not prediction_table.empty: prediction_table['gasprice'] = prediction_table['gasprice']/10 prediction_tableout = prediction_table.to_json(orient='records') filepath_prediction_table = parentdir + '/json/predictTable.json' - with open(filepath_prediction_table, 'w') as outfile: - outfile.write(prediction_tableout) + exporter.write_json('predictTable', prediction_tableout) filepath_gprecs = parentdir + '/json/ethgasAPI.json' - with open(filepath_gprecs, 'w') as outfile: - json.dump(gprecs, outfile) - - - - + exporter.write_json('ethgasAPI', gprecs) except Exception as e: print(e) - + def master_control(report_option): (blockdata, alltx) = init_dfs() txpool = pd.DataFrame() snapstore = pd.DataFrame() print ('blocks '+ str(len(blockdata))) print ('txcount '+ str(len(alltx))) - timer = Timers(web3.eth.blockNumber) + timer = Timers(web3.eth.blockNumber) start_time = time.time() first_cycle = True analyzed = 0 - + def append_new_tx(clean_tx): nonlocal alltx if not clean_tx.hash in alltx.index: alltx = alltx.append(clean_tx.to_dataframe(), ignore_index = False) - - def update_dataframes(block): nonlocal alltx @@ -159,16 +148,16 @@ def update_dataframes(block): mined_block_num = block-3 (mined_blockdf, block_obj) = process_block_transactions(mined_block_num) - #add mined data to tx dataframe + #add mined data to tx dataframe mined_blockdf_seen = mined_blockdf[mined_blockdf.index.isin(alltx.index)] print('num mined in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf))) print('num seen in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf_seen))) alltx = alltx.combine_first(mined_blockdf) - + #process block data block_sumdf = process_block_data(mined_blockdf, block_obj) - #add block data to block dataframe + #add block data to block dataframe blockdata = blockdata.append(block_sumdf, ignore_index = True) #get hashpower table, block interval time, gaslimit, speed from last 200 blocks @@ -193,8 +182,8 @@ def update_dataframes(block): #make txpool block data txpool_block = make_txpool_block(block, txpool, alltx) - - if not txpool_block.empty: + + if not txpool_block.empty: #new dfs grouped by gasprice and nonce txpool_by_gp = txpool_block[['gas_price', 'round_gp_10gwei']].groupby('round_gp_10gwei').agg({'gas_price':'count'}) txpool_block_nonce = txpool_block[['from_address', 'nonce']].groupby('from_address').agg({'nonce':'min'}) @@ -220,11 +209,11 @@ def update_dataframes(block): try: if txpool_block.notnull: analyzed_block = analyze_txpool(block, txpool_block, hashpower, hpower2, block_time, gaslimit, txatabove_lookup, gp_lookup, gp_lookup2, gprecs) - #update alltx + #update alltx alltx = alltx.combine_first(analyzed_block) except: pass - + #with pd.option_context('display.max_columns', None,): #print(analyzed_block) @@ -254,10 +243,10 @@ def update_dataframes(block): (blockdata, alltx, txpool) = prune_data(blockdata, alltx, txpool, block) return True - except: - print(traceback.format_exc()) + except: + print(traceback.format_exc()) + - while True: try: block = web3.eth.blockNumber @@ -265,20 +254,20 @@ def update_dataframes(block): analyzed = block tx_filter = web3.eth.filter('pending') #get list of txhashes from txpool - print("getting txpool hashes at block " +str(block) +" ...") + print("getting txpool hashes at block " +str(block) +" ...") current_txpool = get_txhases_from_txpool(block) #add txhashes to txpool dataframe print("done. length = " +str(len(current_txpool))) txpool = txpool.append(current_txpool, ignore_index = False) except: pass - + try: new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) except: tx_filter = web3.eth.filter('pending') new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) - + timestamp = time.time() #this can be adjusted depending on how fast your server is @@ -297,9 +286,9 @@ def update_dataframes(block): elif timer.process_block == (block-1) and len(new_tx_list) > 200: print("sampling 200 from " + str(len(new_tx_list)) + " new tx") new_tx_list = random.sample(new_tx_list, 200) - - for new_tx in new_tx_list: - try: + + for new_tx in new_tx_list: + try: tx_obj = web3.eth.getTransaction(new_tx) clean_tx = CleanTx(tx_obj, block, timestamp) append_new_tx(clean_tx) @@ -307,7 +296,7 @@ def update_dataframes(block): pass first_cycle = False - + if (timer.process_block < block): try: test_filter = web3.eth.uninstallFilter(tx_filter.filter_id) @@ -319,14 +308,14 @@ def update_dataframes(block): print ('finished ' + str(timer.process_block) + "\n") timer.process_block = timer.process_block + 1 first_cycle = True - + if (timer.process_block < (block - 8)): print("skipping ahead \n") timer.process_block = (block-1) - - - -if len(sys.argv) > 1: + + + +if len(sys.argv) > 1: report_option = sys.argv[1] # '-r' = make website report else: report_option = False diff --git a/model_gas.py b/model_gas.py index 1a7cb5c..6e96f22 100755 --- a/model_gas.py +++ b/model_gas.py @@ -17,11 +17,10 @@ from sqlalchemy import create_engine from patsy import dmatrices +import egs.settings -from egs.settings import get_settings_filepath, load_settings, get_setting - -settings_file = get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) -load_settings(settings_file) +settings_file = egs.settings.get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) +egs.settings.load_settings(settings_file) # TODO: choose one mysql or the other cnx = mysql.connector.connect( @@ -31,15 +30,7 @@ port=get_setting('mysql', 'port'), database=get_setting('mysql', 'database')) cursor = cnx.cursor() -engine = create_engine( - "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( - get_setting('mysql', 'username'), - get_setting('mysql', 'password'), - get_setting('mysql', 'hostname'), - get_setting('mysql', 'port'), - get_setting('mysql', 'database') - ), echo=False) - +engine = egs.settings.get_mysql_connstr() query = ("SELECT * FROM minedtx2") cursor.execute(query) head = cursor.column_names diff --git a/per_block_analysis.py b/per_block_analysis.py index 502b56a..c216c72 100755 --- a/per_block_analysis.py +++ b/per_block_analysis.py @@ -1,11 +1,12 @@ import pandas as pd import numpy as np import traceback -from web3 import Web3, HTTPProvider -web3 = Web3(HTTPProvider('http://localhost:8545')) -from egs import * +from egs.egs_ref import * +import egs.settings import modelparams +web3 = egs.settings.get_web3_provider() + def get_txhases_from_txpool(block): """gets list of all txhash in txpool at block and returns dataframe""" hashlist = [] @@ -26,7 +27,7 @@ def process_block_transactions(block): """get tx data from block""" block_df = pd.DataFrame() block_obj = web3.eth.getBlock(block, True) - miner = block_obj.miner + miner = block_obj.miner for transaction in block_obj.transactions: clean_tx = CleanTx(transaction, None, None, miner) block_df = block_df.append(clean_tx.to_dataframe(), ignore_index = False) @@ -70,10 +71,10 @@ def get_tx_atabove(gasprice, txpool_by_gp): def check_recent(gasprice, submitted_recent): """gets the %of transactions unmined submitted in recent blocks""" - + #set this to avoid false positive delays - submitted_recent.loc[(submitted_recent['still_here'] >= 1) & (submitted_recent['still_here'] <= 2) & (submitted_recent['total'] < 4), 'pct_unmined'] = np.nan - maxval = submitted_recent.loc[submitted_recent.index > gasprice, 'pct_unmined'].max() + submitted_recent.loc[(submitted_recent['still_here'] >= 1) & (submitted_recent['still_here'] <= 2) & (submitted_recent['total'] < 4), 'pct_unmined'] = np.nan + maxval = submitted_recent.loc[submitted_recent.index > gasprice, 'pct_unmined'].max() if gasprice in submitted_recent.index: stillh = submitted_recent.get_value(gasprice, 'still_here') if stillh > 2: @@ -81,7 +82,7 @@ def check_recent(gasprice, submitted_recent): else: rval = maxval else: - rval = maxval + rval = maxval if gasprice >= 1000: rval = 0 if (rval > maxval) or (gasprice >= 1000) : @@ -173,12 +174,12 @@ def make_txpool_block(block, txpool, alltx): else: txpool_block = pd.DataFrame() print ('txpool block length 0') - return txpool_block + return txpool_block def analyze_nonce(txpool_block, txpool_block_nonce): """flags tx in txpool_block that are chained to lower nonce tx""" txpool_block['num_from'] = txpool_block.groupby('from_address')['block_posted'].transform('count') - #when multiple tx from same from address, finds tx with lowest nonce (unchained) - others are 'chained' + #when multiple tx from same from address, finds tx with lowest nonce (unchained) - others are 'chained' txpool_block['chained'] = txpool_block.apply(check_nonce, args=(txpool_block_nonce,), axis=1) return txpool_block @@ -219,7 +220,7 @@ def make_predcitiontable (hashpower, hpower, avg_timemined, txpool_by_gp, submit predictTable['expectedWait'] = predictTable.apply(predict, axis=1) predictTable['expectedTime'] = predictTable['expectedWait'].apply(lambda x: np.round((x * avg_timemined / 60), decimals=2)) - return (predictTable, txatabove_lookup, gp_lookup, gp_lookup2) + return (predictTable, txatabove_lookup, gp_lookup, gp_lookup2) def get_gasprice_recs(prediction_table, block_time, block, speed, array5m, array30m, minlow=-1, submitted_5mago=None, submitted_30mago=None): """gets gasprice recommendations from txpool and model estimates""" @@ -240,7 +241,7 @@ def gp_from_txpool(timeframe, calc): elif (txpool > calc) and (prediction_table.loc[prediction_table['gasprice'] == (txpool - 10), label_df[0]].values[0] > 15): rec = txpool else: - rec = calc + rec = calc except Exception as e: txpool = np.nan rec = np.nan @@ -295,7 +296,7 @@ def get_fastest(): minhash_list = prediction_table.loc[prediction_table['hashpower_accepting']>95, 'gasprice'] if fastest < minhash_list.min(): fastest = minhash_list.min() - return float(fastest) + return float(fastest) def get_wait(gasprice): try: diff --git a/report_generator.py b/report_generator.py index 5d44e36..4e16c4c 100755 --- a/report_generator.py +++ b/report_generator.py @@ -19,13 +19,13 @@ def get_minedgasprice(row): return row['round_gp_10gwei']/10 else: return np.nan - + self.tx_df['minedGasPrice'] = self.tx_df.apply(get_minedgasprice, axis=1) self.tx_df['gasCat1'] = (self.tx_df['minedGasPrice'] <= 1) & (self.tx_df['minedGasPrice'] >=0) self.tx_df['gasCat2'] = (self.tx_df['minedGasPrice']>1) & (self.tx_df['minedGasPrice']<= 4) self.tx_df['gasCat3'] = (self.tx_df['minedGasPrice']>4) & (self.tx_df['minedGasPrice']<= 20) self.tx_df['gasCat4'] = (self.tx_df['minedGasPrice']>20) & (self.tx_df['minedGasPrice']<= 50) - self.tx_df['gasCat5'] = (self.tx_df['minedGasPrice']>50) + self.tx_df['gasCat5'] = (self.tx_df['minedGasPrice']>50) self.block_df['emptyBlocks'] = (self.block_df['numtx']==0).astype(int) self.tx_df['mined'] = self.tx_df['block_mined'].notnull() self.tx_df['delay'] = self.tx_df['block_mined'] - self.tx_df['block_posted'] @@ -58,6 +58,7 @@ def get_minedgasprice(row): self.post['medianDelayTime'] = float(self.tx_df['delay2'].quantile(.5)) """ETH price data""" + # TODO: cache this URI call somewhere url = "https://min-api.cryptocompare.com/data/price?fsym=ETH&tsyms=USD,EUR,GBP,CNY" try: with urllib.request.urlopen(url, timeout=3) as response: @@ -72,7 +73,7 @@ def get_minedgasprice(row): self.post['ETHpriceEUR'] = 0 self.post['ETHpriceCNY'] = 0 self.post['ETHpriceGBP'] = 0 - + """find minimum gas price with at least 50 transactions mined""" tx_grouped_price = self.tx_df[['block_posted', 'minedGasPrice']].groupby('minedGasPrice').count() tx_grouped_price.rename(columns = {'block_posted': 'count'}, inplace = True) @@ -80,7 +81,7 @@ def get_minedgasprice(row): minlow_series = tx_grouped_price[tx_grouped_price['sum']>50].index self.post['minLow'] = float(minlow_series.min()) self.minlow = float(minlow_series.min()*10) - + """generate table with key miner stats""" miner_txdata = self.tx_df[['block_posted', 'miner']].groupby('miner').count() miner_txdata.rename(columns={'block_posted':'count'}, inplace = True) @@ -158,13 +159,13 @@ def get_minedgasprice(row): grouped_lowprice = lowprice.groupby('gasprice', as_index=False).head(10) grouped_lowprice.reset_index(inplace=True) self.lowprice = grouped_lowprice.sort_values('gasprice', ascending=False) - + """average block time""" blockinterval = self.block_df[['block_number', 'time_mined']].diff() blockinterval.loc[blockinterval['block_number'] > 1, 'time_mined'] = np.nan blockinterval.loc[blockinterval['block_number']< -1, 'time_mined'] = np.nan self.avg_timemined = blockinterval['time_mined'].mean() - + """median wait time by gas price for bar graph""" price_wait = self.tx_df.loc[:, ['minedGasPrice', 'delay2']] price_wait.loc[price_wait['minedGasPrice']>=40, 'minedGasPrice'] = 40 From c166b3d25123e664e7b3c84288eb36444acc794a Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 14 Jan 2018 21:34:47 -0800 Subject: [PATCH 14/25] fix this typo --- ethgasstation.py | 2 +- per_block_analysis.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethgasstation.py b/ethgasstation.py index 881782c..55429a3 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -255,7 +255,7 @@ def update_dataframes(block): tx_filter = web3.eth.filter('pending') #get list of txhashes from txpool print("getting txpool hashes at block " +str(block) +" ...") - current_txpool = get_txhases_from_txpool(block) + current_txpool = get_txhashes_from_txpool(block) #add txhashes to txpool dataframe print("done. length = " +str(len(current_txpool))) txpool = txpool.append(current_txpool, ignore_index = False) diff --git a/per_block_analysis.py b/per_block_analysis.py index c216c72..0e2a8a3 100755 --- a/per_block_analysis.py +++ b/per_block_analysis.py @@ -7,7 +7,7 @@ web3 = egs.settings.get_web3_provider() -def get_txhases_from_txpool(block): +def get_txhashes_from_txpool(block): """gets list of all txhash in txpool at block and returns dataframe""" hashlist = [] try: From bf1fd9b69467479d4982ca7f746c95918c904711 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 14 Jan 2018 21:40:00 -0800 Subject: [PATCH 15/25] Remove old JSON filepaths, as JsonExporter will handle this --- ethgasstation.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ethgasstation.py b/ethgasstation.py index 55429a3..6a964c5 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -85,12 +85,6 @@ def write_report(report, top_miners, price_wait, miner_txdata, gasguzz, lowprice gasguzzout = gasguzz.to_json(orient='records') lowpriceout = lowprice.to_json(orient='records') price_waitout = price_wait.to_json(orient='records') - filepath_report = parentdir + '/json/txDataLast10k.json' - filepath_tminers = parentdir + '/json/topMiners.json' - filepath_pwait = parentdir + '/json/priceWait.json' - filepath_minerout = parentdir + '/json/miners.json' - filepath_gasguzzout = parentdir + '/json/gasguzz.json' - filepath_lowpriceout = parentdir + '/json/validated.json' try: exporter.write_json('txDataLast10k', report) @@ -110,10 +104,8 @@ def write_to_json(gprecs, prediction_table=pd.DataFrame()): if not prediction_table.empty: prediction_table['gasprice'] = prediction_table['gasprice']/10 prediction_tableout = prediction_table.to_json(orient='records') - filepath_prediction_table = parentdir + '/json/predictTable.json' exporter.write_json('predictTable', prediction_tableout) - filepath_gprecs = parentdir + '/json/ethgasAPI.json' exporter.write_json('ethgasAPI', gprecs) except Exception as e: print(e) From 4aa70407fac6e0d30a24c0d462d6ae95dfbaab58 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sun, 14 Jan 2018 21:55:49 -0800 Subject: [PATCH 16/25] make same settings --- settings.docker.conf | 64 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/settings.docker.conf b/settings.docker.conf index e0e2160..3371232 100644 --- a/settings.docker.conf +++ b/settings.docker.conf @@ -1,5 +1,13 @@ -# EthGasStation config +; EthGasStation Configuration File +; +; This configuration file is used inside containers for all of +; the EthGasStation pipeline. This specific version is used by +; the Dockerfile. + +; MySQL/MariaDB +; EthGasStation's backend uses MariaDB for a relational data store +; to store information about its statistical model. [mysql] hostname = mariadb port = 3306 @@ -7,11 +15,63 @@ password = station database = tx + +; Redis +; EthGasStation uses Redis for fast-access key/value storage, such as +; for session synchronization across web servers, rate limiting, or +; JSON dumps from the backend. +[redis] + ; hostname of redis server + hostname = redis + ; redis port + port = 6379 + ; protocol (supported: redis (tcp), unix (sock)) + protocol = redis + ; path = /path/to/redis.sock + + +; geth +; EthGasStation is dependent upon a working geth node. If you have a single +; geth node, use the "geth" section. RPC is required, with access to `txpool`. [geth] + ; hostname of geth node hostname = geth + ; geth rpc port port = 8545 + ; protocol (supported: http) protocol = http + +; JSON +; This will be removed in a future version. [json] output_type = redis - output_location = http://redis:6379/ \ No newline at end of file + output_location = http://redis:6379/ + + +; API-specific settings +; The API layer, ethgasstation-api, uses these settings as its defaults. +; You will want to change them if you are running a public API. +[api] + ; reverse proxy info + ; if the application is behind a load balancer or cloudflare, + ; set "behind_reverse_proxy" to true. + behind_reverse_proxy = false + ; proxy servers send the client IP via a different header, such + ; as x-real-ip or x-forwarded-for. + proxy_real_ip_header = x-forwarded-for + + ; ip rate limiting + ; this rate limits IP addresses to stop bad (or stupid) actors + rate_limit_disable = false + rate_limit_max_requests = 120 + rate_limit_request_window_seconds = 60 + + ; ip or api key banning + ; used to permanently blacklist specific IPs + banhammer_disable = false + banhammer_backing_store = redis + ; if you aren't running redis, you can use flat files. + ; this isn't recommended because it will be slow. + banhammer_api_key_file = /tmp/api_ban.txt + banhammer_ip_addr_file = /tmp/ip_ban.txt \ No newline at end of file From bdbdb1e2a5c3b30724e3ac201d4126a458b63da9 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Wed, 17 Jan 2018 22:43:36 -0800 Subject: [PATCH 17/25] Get some dev env settings overrides in place --- .gitignore | 4 ++++ egs/settings.py | 24 +++++++++++++++++++++--- ethgasstation.py | 5 ++--- per_block_analysis.py | 3 ++- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index af2f537..281f37d 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,7 @@ venv.bak/ # mypy .mypy_cache/ + +# various dev stuff +settings.dev.conf +test.db diff --git a/egs/settings.py b/egs/settings.py index 75599f9..a0fb9f1 100644 --- a/egs/settings.py +++ b/egs/settings.py @@ -1,5 +1,6 @@ import configparser import os +import sys from web3 import Web3, HTTPProvider @@ -10,11 +11,14 @@ def settings_file_loaded(): global settings_loaded return settings_loaded is True -def load_settings(settings_file): +def load_settings(settings_file=None): """Load settings from a settings file.""" global parser_instance global settings_loaded + if settings_file is None: + settings_file = get_settings_filepath() + """Get settings from INI configuration file.""" parser_instance = configparser.ConfigParser() parser_instance.read(settings_file) @@ -29,7 +33,7 @@ def get_setting(section, name): return parser_instance[section][name] raise KeyError("Could not find setting %s.%s in configuration." % (section, name)) -def get_settings_filepath(curdir): +def get_settings_filepath(): """Find a valid configuration file. Order of priority: '/etc/ethgasstation/settings.conf' @@ -44,6 +48,14 @@ def get_settings_filepath(curdir): '/opt/ethgasstation/settings.conf' ] + # short circuit on environment variable + if "SETTINGS_FILE" in os.environ: + path = os.path.join(os.getcwd(), os.environ['SETTINGS_FILE']) + if os.path.isfile(path): + return os.path.abspath(os.environ['SETTINGS_FILE']) + else: + raise FileNotFoundError("Can't find env-set settings file at %s" % path) + for candidate_location in default_ini_locations: if os.path.isfile(candidate_location): return candidate_location @@ -60,7 +72,13 @@ def get_web3_provider(): return web3 def get_mysql_connstr(): - """Get a MySQL connection string for SQLAlchemy.""" + """Get a MySQL connection string for SQLAlchemy, or short circuit to + SQLite for a dev mode.""" + if "USE_SQLITE_DB" in os.environ: + sqlite_db_path = os.path.join(os.getcwd(), os.environ["USE_SQLITE_DB"]) + connstr = "sqlite:///%s" % (sqlite_db_path) + return connstr + connstr = "mysql+mysqlconnector://%s:%s@%s:%s/%s" % ( get_setting('mysql', 'username'), get_setting('mysql', 'password'), diff --git a/ethgasstation.py b/ethgasstation.py index 7ae7e2c..f44d2b9 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -15,6 +15,8 @@ import numpy as np import egs.settings +egs.settings.load_settings() + from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger, text from sqlalchemy.orm import sessionmaker @@ -26,9 +28,6 @@ from report_generator import * # configure necessary services -settings_file = egs.settings.get_settings_filepath(os.path.dirname(os.path.realpath(__file__))) -egs.settings.load_settings(settings_file) - exporter = JSONExporter() web3 = egs.settings.get_web3_provider() connstr = egs.settings.get_mysql_connstr() diff --git a/per_block_analysis.py b/per_block_analysis.py index 09ad377..e897e4c 100755 --- a/per_block_analysis.py +++ b/per_block_analysis.py @@ -3,8 +3,9 @@ import traceback from egs.egs_ref import * import egs.settings -import modelparams +import egs.modelparams as modelparams +egs.settings.load_settings() web3 = egs.settings.get_web3_provider() def get_txhashes_from_txpool(block): From 181ac7eac488b668176bc21888fafa27eb61e746 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Wed, 17 Jan 2018 23:08:17 -0800 Subject: [PATCH 18/25] move report generator into egs --- report_generator.py => egs/report_generator.py | 0 ethgasstation.py | 3 ++- 2 files changed, 2 insertions(+), 1 deletion(-) rename report_generator.py => egs/report_generator.py (100%) diff --git a/report_generator.py b/egs/report_generator.py similarity index 100% rename from report_generator.py rename to egs/report_generator.py diff --git a/ethgasstation.py b/ethgasstation.py index f44d2b9..530a5c1 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -24,8 +24,9 @@ from egs.egs_ref import * from egs.jsonexporter import JSONExporter, JSONExporterException +from egs.report_generator import SummaryReport from per_block_analysis import * -from report_generator import * + # configure necessary services exporter = JSONExporter() From c3471398ed7f22677684b45cc91c0fc8aa25f497 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Wed, 17 Jan 2018 23:10:00 -0800 Subject: [PATCH 19/25] fix gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 281f37d..3481f0a 100644 --- a/.gitignore +++ b/.gitignore @@ -106,3 +106,5 @@ venv.bak/ # various dev stuff settings.dev.conf test.db +.vscode +json From af18c5d00e57323c29c386ecd02d9d2c56ef58eb Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Thu, 18 Jan 2018 00:19:13 -0800 Subject: [PATCH 20/25] fix some indentation --- egs/report_generator.py | 3 ++- ethgasstation.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/egs/report_generator.py b/egs/report_generator.py index 4e16c4c..e7a4271 100755 --- a/egs/report_generator.py +++ b/egs/report_generator.py @@ -5,7 +5,7 @@ import time -class SummaryReport(): +class SummaryReport(object): """analyzes data from last x blocks to create summary stats""" def __init__(self, tx_df, block_df, end_block): self.end_block = end_block @@ -59,6 +59,7 @@ def get_minedgasprice(row): """ETH price data""" # TODO: cache this URI call somewhere + # TODO: cert pin cryptocompare so these are less likely to be tampered with url = "https://min-api.cryptocompare.com/data/price?fsym=ETH&tsyms=USD,EUR,GBP,CNY" try: with urllib.request.urlopen(url, timeout=3) as response: diff --git a/ethgasstation.py b/ethgasstation.py index 530a5c1..ac9c035 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -290,20 +290,20 @@ def update_dataframes(block): first_cycle = False if (timer.process_block < block): - try: - test_filter = web3.eth.uninstallFilter(tx_filter.filter_id) - except: - pass - print('current block ' +str(block)) - print ('processing block ' + str(timer.process_block)) - updated = update_dataframes(timer.process_block) - print ('finished ' + str(timer.process_block) + "\n") - timer.process_block = timer.process_block + 1 - first_cycle = True + try: + test_filter = web3.eth.uninstallFilter(tx_filter.filter_id) + except: + pass + print('current block ' +str(block)) + print ('processing block ' + str(timer.process_block)) + updated = update_dataframes(timer.process_block) + print ('finished ' + str(timer.process_block) + "\n") + timer.process_block = timer.process_block + 1 + first_cycle = True if (timer.process_block < (block - 8)): - print("skipping ahead \n") - timer.process_block = (block-1) + print("skipping ahead \n") + timer.process_block = (block-1) From 77295170ad1a2fb31ff154cd6ac4599ce70688ac Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 19 Jan 2018 03:24:31 +0000 Subject: [PATCH 21/25] finish initial refactor --- egs/exceptions.py | 6 + egs/main.py | 319 ++++++++++++++++++ egs/modelparams/__init__.py | 0 .../constants.py} | 0 egs/output.py | 81 +++++ .../per_block_analysis.py | 42 +-- ethgasstation.py | 319 +----------------- 7 files changed, 442 insertions(+), 325 deletions(-) create mode 100644 egs/exceptions.py create mode 100644 egs/main.py create mode 100644 egs/modelparams/__init__.py rename egs/{modelparams.py => modelparams/constants.py} (100%) create mode 100644 egs/output.py rename per_block_analysis.py => egs/per_block_analysis.py (95%) diff --git a/egs/exceptions.py b/egs/exceptions.py new file mode 100644 index 0000000..b3f5c82 --- /dev/null +++ b/egs/exceptions.py @@ -0,0 +1,6 @@ +""" +Generic EGS Exceptions +""" + +class EGSError(Exception): + pass \ No newline at end of file diff --git a/egs/main.py b/egs/main.py new file mode 100644 index 0000000..b1a6519 --- /dev/null +++ b/egs/main.py @@ -0,0 +1,319 @@ +""" +ETH Gas Station +Main Event Loop +""" + +import time +import sys +import json +import math +import traceback +import os +import random +import pandas as pd +import numpy as np + +import egs.settings +egs.settings.load_settings() + +from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger, text +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base + +from .egs_ref import * +from .jsonexporter import JSONExporter, JSONExporterException +from .report_generator import SummaryReport +from .per_block_analysis import * +from .output import Output, OutputException + + +# configure necessary services +exporter = JSONExporter() +web3 = egs.settings.get_web3_provider() +connstr = egs.settings.get_mysql_connstr() +engine = create_engine(connstr, echo=False) +Base.metadata.create_all(engine) +Session = sessionmaker(bind=engine) +session = Session() +console = Output() + + +def init_dfs(): + """load data from mysql""" + blockdata = pd.read_sql('SELECT * from blockdata2 order by id desc limit 2000', con=engine) + blockdata = blockdata.drop('id', axis=1) + postedtx = pd.read_sql('SELECT * from postedtx2 order by id desc limit 100000', con=engine) + minedtx = pd.read_sql('SELECT * from minedtx2 order by id desc limit 100000', con=engine) + minedtx.set_index('index', drop=True, inplace=True) + alltx = pd.read_sql('SELECT * from minedtx2 order by id desc limit 100000', con=engine) + alltx.set_index('index', drop=True, inplace=True) + alltx = postedtx[['index', 'expectedTime', 'expectedWait', 'mined_probability', 'highgas2', 'from_address', 'gas_offered', 'gas_price', 'hashpower_accepting', 'num_from', 'num_to', 'ico', 'dump', 'high_gas_offered', 'pct_limit', 'round_gp_10gwei', 'time_posted', 'block_posted', 'to_address', 'tx_atabove', 'wait_blocks', 'chained', 'nonce']].join(minedtx[['block_mined', 'miner', 'time_mined', 'removed_block']], on='index', how='left') + alltx.set_index('index', drop=True, inplace=True) + return(blockdata, alltx) + +def prune_data(blockdata, alltx, txpool, block): + """keep dataframes and databases from getting too big""" + stmt = text("DELETE FROM postedtx2 WHERE block_posted <= :block") + stmt2 = text("DELETE FROM minedtx2 WHERE block_mined <= :block") + deleteBlock_sql = block - 3500 + deleteBlock_mined = block - 1700 + deleteBlock_posted = block - 5500 + engine.execute(stmt, block=deleteBlock_sql) + engine.execute(stmt2, block=deleteBlock_sql) + alltx = alltx.loc[(alltx['block_posted'] > deleteBlock_posted) | (alltx['block_mined'] > deleteBlock_mined)] + blockdata = blockdata.loc[blockdata['block_number'] > deleteBlock_posted] + txpool = txpool.loc[txpool['block'] > (block-10)] + return (blockdata, alltx, txpool) + +def write_to_sql(alltx, block_sumdf, mined_blockdf, block): + """write data to mysql for analysis""" + post = alltx[alltx.index.isin(mined_blockdf.index)] + post.to_sql(con=engine, name='minedtx2', if_exists='append', index=True) + console.info('num mined = ' + str(len(post))) + post2 = alltx.loc[alltx['block_posted'] == (block)] + post2.to_sql(con=engine, name='postedtx2', if_exists='append', index=True) + console.info('num posted = ' + str(len(post2))) + block_sumdf.to_sql(con=engine, name='blockdata2', if_exists='append', index=False) + +def write_report(report, top_miners, price_wait, miner_txdata, gasguzz, lowprice): + """write json data""" + global exporter + + parentdir = os.path.dirname(os.getcwd()) + top_minersout = top_miners.to_json(orient='records') + minerout = miner_txdata.to_json(orient='records') + gasguzzout = gasguzz.to_json(orient='records') + lowpriceout = lowprice.to_json(orient='records') + price_waitout = price_wait.to_json(orient='records') + + try: + exporter.write_json('txDataLast10k', report) + exporter.write_json('topMiners', top_minersout) + exporter.write_json('priceWait', price_waitout) + exporter.write_json('miners', minerout) + exporter.write_json('gasguzz', gasguzzout) + exporter.write_json('validated', lowpriceout) + except Exception as e: + console.error("write_report: Exception caught: " + str(e)) + +def write_to_json(gprecs, prediction_table=pd.DataFrame()): + """write json data""" + global exporter + try: + parentdir = os.path.dirname(os.getcwd()) + if not prediction_table.empty: + prediction_table['gasprice'] = prediction_table['gasprice']/10 + prediction_tableout = prediction_table.to_json(orient='records') + exporter.write_json('predictTable', prediction_tableout) + + exporter.write_json('ethgasAPI', gprecs) + except Exception as e: + console.error("write_to_json: Exception caught: " + str(e)) + +def master_control(args): + + if args.generate_report is True: + report_option = True + + (blockdata, alltx) = init_dfs() + txpool = pd.DataFrame() + snapstore = pd.DataFrame() + console.info('blocks '+ str(len(blockdata))) + console.info('txcount '+ str(len(alltx))) + timer = Timers(web3.eth.blockNumber) + start_time = time.time() + first_cycle = True + analyzed = 0 + + + def append_new_tx(clean_tx): + nonlocal alltx + if not clean_tx.hash in alltx.index: + alltx = alltx.append(clean_tx.to_dataframe(), ignore_index = False) + + def update_dataframes(block): + nonlocal alltx + nonlocal txpool + nonlocal blockdata + nonlocal timer + got_txpool = 1 + + console.info('updating dataframes at block '+ str(block)) + try: + #get minedtransactions and blockdata from previous block + mined_block_num = block-3 + (mined_blockdf, block_obj) = process_block_transactions(mined_block_num) + + #add mined data to tx dataframe + mined_blockdf_seen = mined_blockdf[mined_blockdf.index.isin(alltx.index)] + console.info('num mined in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf))) + console.info('num seen in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf_seen))) + alltx = alltx.combine_first(mined_blockdf) + + #process block data + console.debug("Processing block data...") + block_sumdf = process_block_data(mined_blockdf, block_obj) + + #add block data to block dataframe + blockdata = blockdata.append(block_sumdf, ignore_index = True) + + #get hashpower table, block interval time, gaslimit, speed from last 200 blocks + (hashpower, block_time, gaslimit, speed) = analyze_last200blocks(block, blockdata) + hpower2 = analyze_last100blocks(block, alltx) + + submitted_30mago = alltx.loc[(alltx['block_posted'] < (block-50)) & (alltx['block_posted'] > (block-120)) & (alltx['chained']==0) & (alltx['gas_offered'] < 500000)].copy() + console.info("# of tx submitted ~ an hour ago: " + str((len(submitted_30mago)))) + + submitted_5mago = alltx.loc[(alltx['block_posted'] < (block-8)) & (alltx['block_posted'] > (block-49)) & (alltx['chained']==0) & (alltx['gas_offered'] < 500000)].copy() + console.info("# of tx submitted ~ 5m ago: " + str((len(submitted_5mago)))) + + if len(submitted_30mago > 50): + submitted_30mago = make_recent_blockdf(submitted_30mago, current_txpool, alltx) + else: + submitted_30mago = pd.DataFrame() + + if len(submitted_5mago > 50): + submitted_5mago = make_recent_blockdf(submitted_5mago, current_txpool, alltx) + else: + submitted_5mago = pd.DataFrame() + + #make txpool block data + txpool_block = make_txpool_block(block, txpool, alltx) + + if not txpool_block.empty: + #new dfs grouped by gasprice and nonce + txpool_by_gp = txpool_block[['gas_price', 'round_gp_10gwei']].groupby('round_gp_10gwei').agg({'gas_price':'count'}) + txpool_block_nonce = txpool_block[['from_address', 'nonce']].groupby('from_address').agg({'nonce':'min'}) + txpool_block = analyze_nonce(txpool_block, txpool_block_nonce) + else: + txpool_by_gp = pd.DataFrame() + txpool_block_nonce = pd.DataFrame() + txpool_block = alltx.loc[alltx['block_posted']==block] + got_txpool = 0 + + #make prediction table and create lookups to speed txpool analysis + (predictiondf, txatabove_lookup, gp_lookup, gp_lookup2) = make_predcitiontable(hashpower, hpower2, block_time, txpool_by_gp, submitted_5mago, submitted_30mago) + + #with pd.option_context('display.max_rows', None,): + #print(predictiondf) + + #make the gas price recommendations + (gprecs, timer.gp_avg_store, timer.gp_safelow_store) = get_gasprice_recs (predictiondf, block_time, block, speed, timer.gp_avg_store, timer.gp_safelow_store, timer.minlow, submitted_5mago, submitted_30mago) + + #create the txpool block data + #first, add txs submitted if empty + + try: + if txpool_block.notnull: + analyzed_block = analyze_txpool(block, txpool_block, hashpower, hpower2, block_time, gaslimit, txatabove_lookup, gp_lookup, gp_lookup2, gprecs) + #update alltx + alltx = alltx.combine_first(analyzed_block) + except: + pass + + #with pd.option_context('display.max_columns', None,): + #print(analyzed_block) + + #make summary report every x blocks + #this is only run if generating reports for website + if report_option is True: + if timer.check_reportblock(block): + last1500t = alltx[alltx['block_mined'] > (block-1500)].copy() + console.info('txs '+ str(len(last1500t))) + last1500b = blockdata[blockdata['block_number'] > (block-1500)].copy() + console.info('blocks ' + str(len(last1500b))) + report = SummaryReport(last1500t, last1500b, block) + console.debug("Writing summary reports for web...") + write_report(report.post, report.top_miners, report.price_wait, report.miner_txdata, report.gasguzz, report.lowprice) + timer.minlow = report.minlow + + + #every block, write gprecs, predictions, txpool by gasprice + + if got_txpool: + write_to_json(gprecs, predictiondf) + else: + write_to_json(gprecs) + + console.debug("Writing transactions to SQL database") + write_to_sql(alltx, block_sumdf, mined_blockdf, block) + + #keep from getting too large + console.debug("Pruning database") + (blockdata, alltx, txpool) = prune_data(blockdata, alltx, txpool, block) + return True + + except: + console.error(traceback.format_exc()) + + + while True: + try: + block = web3.eth.blockNumber + if first_cycle == True and block != analyzed: + analyzed = block + tx_filter = web3.eth.filter('pending') + #get list of txhashes from txpool + console.info("getting txpool hashes at block " +str(block) +" ...") + current_txpool = get_txhashes_from_txpool(block) + #add txhashes to txpool dataframe + console.info("done. length = " +str(len(current_txpool))) + txpool = txpool.append(current_txpool, ignore_index = False) + except: + pass + + try: + console.debug("Getting filter changes...") + new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) + except: + console.warn("pending filter missing, re-establishing filter") + tx_filter = web3.eth.filter('pending') + new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) + + timestamp = time.time() + + #this can be adjusted depending on how fast your server is + if timer.process_block <= (block-5) and len(new_tx_list) > 10: + console.info("sampling 10 from " + str(len(new_tx_list)) + " new tx") + new_tx_list = random.sample(new_tx_list, 10) + elif timer.process_block == (block-4) and len(new_tx_list) > 25: + console.info("sampling 25 from " + str(len(new_tx_list)) + " new tx") + new_tx_list = random.sample(new_tx_list, 25) + elif timer.process_block == (block-3) and len(new_tx_list) > 50: + console.info("sampling 50 from " + str(len(new_tx_list)) + " new tx") + new_tx_list = random.sample(new_tx_list, 50) + elif timer.process_block == (block-2) and len(new_tx_list) > 100: + console.info("sampling 100 from " + str(len(new_tx_list)) + " new tx") + new_tx_list = random.sample(new_tx_list, 100) + elif timer.process_block == (block-1) and len(new_tx_list) > 200: + console.info("sampling 200 from " + str(len(new_tx_list)) + " new tx") + new_tx_list = random.sample(new_tx_list, 200) + + if new_tx_list: + console.debug("Analyzing %d new transactions from txpool." % len(new_tx_list)) + for new_tx in new_tx_list: + try: + console.debug("Get Tx %s" % new_tx) + tx_obj = web3.eth.getTransaction(new_tx) + clean_tx = CleanTx(tx_obj, block, timestamp) + append_new_tx(clean_tx) + except Exception as e: + console.debug("Exception on Tx %s" % new_tx) + + first_cycle = False + + if (timer.process_block < block): + try: + test_filter = web3.eth.uninstallFilter(tx_filter.filter_id) + except: + pass + console.info('current block ' +str(block)) + console.info('processing block ' + str(timer.process_block)) + updated = update_dataframes(timer.process_block) + console.info('finished ' + str(timer.process_block)) + timer.process_block = timer.process_block + 1 + first_cycle = True + + if (timer.process_block < (block - 8)): + console.warn("blocks jumped, skipping ahead") + timer.process_block = (block-1) \ No newline at end of file diff --git a/egs/modelparams/__init__.py b/egs/modelparams/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/egs/modelparams.py b/egs/modelparams/constants.py similarity index 100% rename from egs/modelparams.py rename to egs/modelparams/constants.py diff --git a/egs/output.py b/egs/output.py new file mode 100644 index 0000000..dead45f --- /dev/null +++ b/egs/output.py @@ -0,0 +1,81 @@ +""" + Output + Handles output to stdout for now, will eventually use logging. +""" + +from colored import fg, bg, attr +import time + +class OutputException(Exception): + pass + +class Output(object): + """A wrapper for EGS output to stdout, stderr, etc.""" + + loglevel = None + levels = [ 'off', 'fatal', 'error', 'warn', 'info', 'debug', 'trace' ] + + def __init__(self): + self.set_loglevel('info') + + def set_loglevel(self, level): + if not level.lower() in self.levels: + raise Exception("Unsupported log level %d" % level) + else: + self.loglevel = self.levels.index(level.lower()) + + def log(self, msg, level='info'): + level = self._getlevel(level) + if level <= self.loglevel: + # TODO use stderr, stdout properly + # TODO use logging + print(msg) + + def warn(self, msg): + msg = self._pad(msg) + msg = "%s %s[WARN] %s%s" % (self._logtime(), fg('yellow'), attr(0), msg) + return self.log(msg, 'warn') + + def error(self, msg): + msg = self._pad(msg) + msg = "%s %s[ERROR] %s%s" % (self._logtime(), fg('red'), attr(0), msg) + return self.log(msg, 'error') + + def info(self, msg): + msg = self._pad(msg) + msg = "%s %s[INFO] %s%s" % (self._logtime(), fg('cyan'), attr(0), msg) + return self.log(msg, 'info') + + def debug(self, msg): + msg = self._pad(msg) + msg = "%s %s[DEBUG] %s%s" % (self._logtime(), fg('magenta'), attr(0), msg) + return self.log(msg, 'debug') + + def _logtime(self): + return time.strftime('[%Y-%m-%d %H:%M:%S]') + + def _getlevel(self, level): + level = level.lower() + if not level in self.levels: + raise OutputException("Unsupported level %s" % level) + else: + return self.levels.index(level) + + def _pad(self, string): + """Pad trailing multilines""" + lines = string.split("\n") + skip = True + for line in lines: + if skip: + skip = False + line = " " + line + return "\n".join(lines) + + def banner(self): + """stupid easter eggs are fun""" + print (""" __ __ __ __ _ + ___ / /_/ / ___ ____ ____ ___ / /____ _/ /_(_)__ ___ +/ -_) __/ _ \/ _ `/ _ `(_-<(_-= (block-5)) & (alltx['block_mined'] <= (block))] gp_mined_10th = recent_blocks['gas_price'].quantile(.1) - print (gp_mined_10th/1e8) + console.debug("analyze_last5blocks: " + str(gp_mined_10th/1e8)) return gp_mined_10th/1e8 def make_txpool_block(block, txpool, alltx): @@ -171,10 +174,10 @@ def make_txpool_block(block, txpool, alltx): #txpool_block only has transactions received by filter txpool_block = txpool_block.join(alltx, how='inner') txpool_block = txpool_block.append(alltx.loc[alltx['block_posted']==block]) - print('txpool block length ' + str(len(txpool_block))) + console.info('txpool block length ' + str(len(txpool_block))) else: txpool_block = pd.DataFrame() - print ('txpool block length 0') + console.warn('txpool block length 0') return txpool_block def analyze_nonce(txpool_block, txpool_block_nonce): @@ -235,12 +238,12 @@ def gp_from_txpool(timeframe, calc): try: series = prediction_table.loc[(prediction_table[label_df[0]] <= 5) & (prediction_table[label_df[1]] > 1) & (prediction_table[label_df[2]] > 10), 'gasprice'] txpool = series.min() - print ("\ncalc value :" + str(calc)) - print ('txpool value :' + str(txpool)) + console.debug("\ncalc value: " + str(calc)) + console.debug('txpool value: ' + str(txpool)) if (txpool < calc): rec = txpool elif (txpool > calc) and (prediction_table.loc[prediction_table['gasprice'] == (calc), label_df[0]].values[0] > 15): - print ("txpool>calc") + console.warn("txpool > calc") rec = txpool else: rec = calc @@ -327,9 +330,9 @@ def check_recent_mediangp (gprec, gparray, calc): if (gprec <= calc) and (gprec_m > calc): gprec_m = calc except Exception as e: - print (e) + console.error("check_recent_mediagp: Exception caught: " + str(e)) gprec_m = gprec - print ('medianizer: ' + str(gprec_m)) + console.debug("medianizer: %s" % str(gprec_m)) return (gprec_m, gparray) gprecs = {} @@ -337,10 +340,9 @@ def check_recent_mediangp (gprec, gparray, calc): (gprecs['safeLow'], gprecs['safelow_calc'], gprecs['safelow_txpool']) = get_safelow() (gprecs['average'], gprecs['average_calc'], gprecs['average_txpool']) = get_average() - + gprecs['fast'] = get_fast() - print("") if gprecs['safelow_txpool'] is not np.nan : array30m = np.append(array30m, gprecs['safelow_txpool']) else: @@ -364,7 +366,7 @@ def check_recent_mediangp (gprec, gparray, calc): gprecs['safeLowWait'] = get_wait(gprecs['safeLow']) gprecs['avgWait'] = get_wait(gprecs['average']) - + gprecs['fastWait'] = get_wait(gprecs['fast']) gprecs['fastest'] = get_fastest() gprecs['fastestWait'] = get_wait(gprecs['fastest']) @@ -397,8 +399,8 @@ def analyze_txpool(block, txpool_block, hashpower, hpower, avg_timemined, gaslim """calculate data for transactions in the txpoolblock""" txpool_block = txpool_block.loc[txpool_block['block_posted']==block].copy() txpool_block['pct_limit'] = txpool_block['gas_offered'].apply(lambda x: x / gaslimit) - txpool_block['high_gas_offered'] = (txpool_block['pct_limit'] > modelparams.HIGHGAS1).astype(int) - txpool_block['highgas2'] = (txpool_block['pct_limit'] > modelparams.HIGHGAS2).astype(int) + txpool_block['high_gas_offered'] = (txpool_block['pct_limit'] > HIGHGAS1).astype(int) + txpool_block['highgas2'] = (txpool_block['pct_limit'] > HIGHGAS2).astype(int) txpool_block['hashpower_accepting'] = txpool_block['round_gp_10gwei'].apply(lambda x: gp_lookup[x] if x in gp_lookup else 100) txpool_block['hashpower_accepting2'] = txpool_block['round_gp_10gwei'].apply(lambda x: gp_lookup2[x] if x in gp_lookup2 else 100) if txatabove_lookup is not None: diff --git a/ethgasstation.py b/ethgasstation.py index ac9c035..3fba885 100755 --- a/ethgasstation.py +++ b/ethgasstation.py @@ -4,312 +4,21 @@ Primary backend. """ -import time -import sys -import json -import math -import traceback -import os -import random -import pandas as pd -import numpy as np +import argparse +from egs.main import master_control +from egs.output import Output -import egs.settings -egs.settings.load_settings() +def main(): + """Parse command line options.""" + parser = argparse.ArgumentParser(description="An adaptive gas price oracle for Ethereum.") + parser.add_argument('--generate-report', action='store_true', help="Generate reports for ethgasstation-frontend.") + args = parser.parse_args() -from sqlalchemy import create_engine, Column, Integer, String, DECIMAL, BigInteger, text -from sqlalchemy.orm import sessionmaker -from sqlalchemy.ext.declarative import declarative_base + # kick off the egs main event loop + master_control(args) -from egs.egs_ref import * -from egs.jsonexporter import JSONExporter, JSONExporterException -from egs.report_generator import SummaryReport -from per_block_analysis import * - - -# configure necessary services -exporter = JSONExporter() -web3 = egs.settings.get_web3_provider() -connstr = egs.settings.get_mysql_connstr() -engine = create_engine(connstr, echo=False) -Base.metadata.create_all(engine) -Session = sessionmaker(bind=engine) -session = Session() - - -def init_dfs(): - """load data from mysql""" - blockdata = pd.read_sql('SELECT * from blockdata2 order by id desc limit 2000', con=engine) - blockdata = blockdata.drop('id', axis=1) - postedtx = pd.read_sql('SELECT * from postedtx2 order by id desc limit 100000', con=engine) - minedtx = pd.read_sql('SELECT * from minedtx2 order by id desc limit 100000', con=engine) - minedtx.set_index('index', drop=True, inplace=True) - alltx = pd.read_sql('SELECT * from minedtx2 order by id desc limit 100000', con=engine) - alltx.set_index('index', drop=True, inplace=True) - alltx = postedtx[['index', 'expectedTime', 'expectedWait', 'mined_probability', 'highgas2', 'from_address', 'gas_offered', 'gas_price', 'hashpower_accepting', 'num_from', 'num_to', 'ico', 'dump', 'high_gas_offered', 'pct_limit', 'round_gp_10gwei', 'time_posted', 'block_posted', 'to_address', 'tx_atabove', 'wait_blocks', 'chained', 'nonce']].join(minedtx[['block_mined', 'miner', 'time_mined', 'removed_block']], on='index', how='left') - alltx.set_index('index', drop=True, inplace=True) - return(blockdata, alltx) - -def prune_data(blockdata, alltx, txpool, block): - """keep dataframes and databases from getting too big""" - stmt = text("DELETE FROM postedtx2 WHERE block_posted <= :block") - stmt2 = text("DELETE FROM minedtx2 WHERE block_mined <= :block") - deleteBlock_sql = block - 3500 - deleteBlock_mined = block - 1700 - deleteBlock_posted = block - 5500 - engine.execute(stmt, block=deleteBlock_sql) - engine.execute(stmt2, block=deleteBlock_sql) - alltx = alltx.loc[(alltx['block_posted'] > deleteBlock_posted) | (alltx['block_mined'] > deleteBlock_mined)] - blockdata = blockdata.loc[blockdata['block_number'] > deleteBlock_posted] - txpool = txpool.loc[txpool['block'] > (block-10)] - return (blockdata, alltx, txpool) - -def write_to_sql(alltx, block_sumdf, mined_blockdf, block): - """write data to mysql for analysis""" - post = alltx[alltx.index.isin(mined_blockdf.index)] - post.to_sql(con=engine, name='minedtx2', if_exists='append', index=True) - print ('num mined = ' + str(len(post))) - post2 = alltx.loc[alltx['block_posted'] == (block)] - post2.to_sql(con=engine, name='postedtx2', if_exists='append', index=True) - print ('num posted = ' + str(len(post2))) - block_sumdf.to_sql(con=engine, name='blockdata2', if_exists='append', index=False) - -def write_report(report, top_miners, price_wait, miner_txdata, gasguzz, lowprice): - """write json data""" - global exporter - - parentdir = os.path.dirname(os.getcwd()) - top_minersout = top_miners.to_json(orient='records') - minerout = miner_txdata.to_json(orient='records') - gasguzzout = gasguzz.to_json(orient='records') - lowpriceout = lowprice.to_json(orient='records') - price_waitout = price_wait.to_json(orient='records') - - try: - exporter.write_json('txDataLast10k', report) - exporter.write_json('topMiners', top_minersout) - exporter.write_json('priceWait', price_waitout) - exporter.write_json('miners', minerout) - exporter.write_json('gasguzz', gasguzzout) - exporter.write_json('validated', lowpriceout) - except Exception as e: - print(e) - -def write_to_json(gprecs, prediction_table=pd.DataFrame()): - """write json data""" - global exporter - try: - parentdir = os.path.dirname(os.getcwd()) - if not prediction_table.empty: - prediction_table['gasprice'] = prediction_table['gasprice']/10 - prediction_tableout = prediction_table.to_json(orient='records') - exporter.write_json('predictTable', prediction_tableout) - - exporter.write_json('ethgasAPI', gprecs) - except Exception as e: - print(e) - -def master_control(report_option): - (blockdata, alltx) = init_dfs() - txpool = pd.DataFrame() - snapstore = pd.DataFrame() - print ('blocks '+ str(len(blockdata))) - print ('txcount '+ str(len(alltx))) - timer = Timers(web3.eth.blockNumber) - start_time = time.time() - first_cycle = True - analyzed = 0 - - - def append_new_tx(clean_tx): - nonlocal alltx - if not clean_tx.hash in alltx.index: - alltx = alltx.append(clean_tx.to_dataframe(), ignore_index = False) - - def update_dataframes(block): - nonlocal alltx - nonlocal txpool - nonlocal blockdata - nonlocal timer - got_txpool = 1 - - print('updating dataframes at block '+ str(block)) - try: - #get minedtransactions and blockdata from previous block - mined_block_num = block-3 - (mined_blockdf, block_obj) = process_block_transactions(mined_block_num) - - #add mined data to tx dataframe - mined_blockdf_seen = mined_blockdf[mined_blockdf.index.isin(alltx.index)] - print('num mined in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf))) - print('num seen in ' + str(mined_block_num)+ ' = ' + str(len(mined_blockdf_seen))) - alltx = alltx.combine_first(mined_blockdf) - - #process block data - block_sumdf = process_block_data(mined_blockdf, block_obj) - - #add block data to block dataframe - blockdata = blockdata.append(block_sumdf, ignore_index = True) - - #get hashpower table, block interval time, gaslimit, speed from last 200 blocks - (hashpower, block_time, gaslimit, speed) = analyze_last200blocks(block, blockdata) - hpower2 = analyze_last100blocks(block, alltx) - - submitted_30mago = alltx.loc[(alltx['block_posted'] < (block-50)) & (alltx['block_posted'] > (block-120)) & (alltx['chained']==0) & (alltx['gas_offered'] < 500000)].copy() - print("# of tx submitted ~ an hour ago: " + str((len(submitted_30mago)))) - - submitted_5mago = alltx.loc[(alltx['block_posted'] < (block-8)) & (alltx['block_posted'] > (block-49)) & (alltx['chained']==0) & (alltx['gas_offered'] < 500000)].copy() - print("# of tx submitted ~ 5m ago: " + str((len(submitted_5mago)))) - - if len(submitted_30mago > 50): - submitted_30mago = make_recent_blockdf(submitted_30mago, current_txpool, alltx) - else: - submitted_30mago = pd.DataFrame() - - if len(submitted_5mago > 50): - submitted_5mago = make_recent_blockdf(submitted_5mago, current_txpool, alltx) - else: - submitted_5mago = pd.DataFrame() - - #make txpool block data - txpool_block = make_txpool_block(block, txpool, alltx) - - if not txpool_block.empty: - #new dfs grouped by gasprice and nonce - txpool_by_gp = txpool_block[['gas_price', 'round_gp_10gwei']].groupby('round_gp_10gwei').agg({'gas_price':'count'}) - txpool_block_nonce = txpool_block[['from_address', 'nonce']].groupby('from_address').agg({'nonce':'min'}) - txpool_block = analyze_nonce(txpool_block, txpool_block_nonce) - else: - txpool_by_gp = pd.DataFrame() - txpool_block_nonce = pd.DataFrame() - txpool_block = alltx.loc[alltx['block_posted']==block] - got_txpool = 0 - - #make prediction table and create lookups to speed txpool analysis - (predictiondf, txatabove_lookup, gp_lookup, gp_lookup2) = make_predcitiontable(hashpower, hpower2, block_time, txpool_by_gp, submitted_5mago, submitted_30mago) - - #with pd.option_context('display.max_rows', None,): - #print(predictiondf) - - #make the gas price recommendations - (gprecs, timer.gp_avg_store, timer.gp_safelow_store) = get_gasprice_recs (predictiondf, block_time, block, speed, timer.gp_avg_store, timer.gp_safelow_store, timer.minlow, submitted_5mago, submitted_30mago) - - #create the txpool block data - #first, add txs submitted if empty - - try: - if txpool_block.notnull: - analyzed_block = analyze_txpool(block, txpool_block, hashpower, hpower2, block_time, gaslimit, txatabove_lookup, gp_lookup, gp_lookup2, gprecs) - #update alltx - alltx = alltx.combine_first(analyzed_block) - except: - pass - - #with pd.option_context('display.max_columns', None,): - #print(analyzed_block) - - #make summary report every x blocks - #this is only run if generating reports for website - if report_option == '-r': - if timer.check_reportblock(block): - last1500t = alltx[alltx['block_mined'] > (block-1500)].copy() - print('txs '+ str(len(last1500t))) - last1500b = blockdata[blockdata['block_number'] > (block-1500)].copy() - print('blocks ' + str(len(last1500b))) - report = SummaryReport(last1500t, last1500b, block) - write_report(report.post, report.top_miners, report.price_wait, report.miner_txdata, report.gasguzz, report.lowprice) - timer.minlow = report.minlow - - - #every block, write gprecs, predictions, txpool by gasprice - - if got_txpool: - write_to_json(gprecs, predictiondf) - else: - write_to_json(gprecs) - - write_to_sql(alltx, block_sumdf, mined_blockdf, block) - - #keep from getting too large - (blockdata, alltx, txpool) = prune_data(blockdata, alltx, txpool, block) - return True - - except: - print(traceback.format_exc()) - - - while True: - try: - block = web3.eth.blockNumber - if first_cycle == True and block != analyzed: - analyzed = block - tx_filter = web3.eth.filter('pending') - #get list of txhashes from txpool - print("getting txpool hashes at block " +str(block) +" ...") - current_txpool = get_txhashes_from_txpool(block) - #add txhashes to txpool dataframe - print("done. length = " +str(len(current_txpool))) - txpool = txpool.append(current_txpool, ignore_index = False) - except: - pass - - try: - new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) - except: - tx_filter = web3.eth.filter('pending') - new_tx_list = web3.eth.getFilterChanges(tx_filter.filter_id) - - timestamp = time.time() - - #this can be adjusted depending on how fast your server is - if timer.process_block <= (block-5) and len(new_tx_list) > 10: - print("sampling 10 from " + str(len(new_tx_list)) + " new tx") - new_tx_list = random.sample(new_tx_list, 10) - elif timer.process_block == (block-4) and len(new_tx_list) > 25: - print("sampling 25 from " + str(len(new_tx_list)) + " new tx") - new_tx_list = random.sample(new_tx_list, 25) - elif timer.process_block == (block-3) and len(new_tx_list) > 50: - print("sampling 50 from " + str(len(new_tx_list)) + " new tx") - new_tx_list = random.sample(new_tx_list, 50) - elif timer.process_block == (block-2) and len(new_tx_list) > 100: - print("sampling 100 from " + str(len(new_tx_list)) + " new tx") - new_tx_list = random.sample(new_tx_list, 100) - elif timer.process_block == (block-1) and len(new_tx_list) > 200: - print("sampling 200 from " + str(len(new_tx_list)) + " new tx") - new_tx_list = random.sample(new_tx_list, 200) - - for new_tx in new_tx_list: - try: - tx_obj = web3.eth.getTransaction(new_tx) - clean_tx = CleanTx(tx_obj, block, timestamp) - append_new_tx(clean_tx) - except Exception as e: - pass - - first_cycle = False - - if (timer.process_block < block): - try: - test_filter = web3.eth.uninstallFilter(tx_filter.filter_id) - except: - pass - print('current block ' +str(block)) - print ('processing block ' + str(timer.process_block)) - updated = update_dataframes(timer.process_block) - print ('finished ' + str(timer.process_block) + "\n") - timer.process_block = timer.process_block + 1 - first_cycle = True - - if (timer.process_block < (block - 8)): - print("skipping ahead \n") - timer.process_block = (block-1) - - - -if len(sys.argv) > 1: - report_option = sys.argv[1] # '-r' = make website report -else: - report_option = False - -master_control(report_option) +if __name__ == '__main__': + o = Output() + o.banner() + main() From b76838f4819979158ea4ab8acc26cf769f02997f Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 19 Jan 2018 03:27:40 +0000 Subject: [PATCH 22/25] Set settings to mimic classic, update README --- README.md | 3 +++ settings.conf | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c4f40e1..7dfc7b1 100755 --- a/README.md +++ b/README.md @@ -51,6 +51,9 @@ To run the script as is on bare metal or a VM, manually: 1. Install requirements using `pip3 install -r requirements.txt` 2. Run `./ethgasstation.py` or `python3 ethgasstation.py`. +If you are running a frontend to ETH Gas Station, use the `--generate-report` +flag to generate detailed JSON reports for front-end or API consumption. + It is also possible to run the oracle as a Docker container. 1. Change the settings in settings.docker.conf. diff --git a/settings.conf b/settings.conf index 43633d2..3a2576b 100644 --- a/settings.conf +++ b/settings.conf @@ -14,4 +14,4 @@ [json] output_type = file - output_location = ./json + output_location = ../json From af1fe626e4f9f4f7e2c4a770061f49ee999fa6e1 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 19 Jan 2018 17:26:52 +0000 Subject: [PATCH 23/25] add readme clarification --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 7dfc7b1..8c31190 100755 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ location string, e.g. `http://:password@localhost:6379/`. To run the script as is on bare metal or a VM, manually: +0. Edit `settings.conf` and install to [an allowed directory](https://github.com/ethgasstation/ethgasstation-backend/pull/17/files#diff-bbda44d05044576b25a2c6cf4b0c3597R37). 1. Install requirements using `pip3 install -r requirements.txt` 2. Run `./ethgasstation.py` or `python3 ethgasstation.py`. From dd8ddc186c897851eaf0b92ce6298018ab3cc154 Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Fri, 19 Jan 2018 17:33:59 +0000 Subject: [PATCH 24/25] move old settings config to settings.classic.conf --- settings.conf => settings.classic.conf | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename settings.conf => settings.classic.conf (100%) diff --git a/settings.conf b/settings.classic.conf similarity index 100% rename from settings.conf rename to settings.classic.conf From a20fe20f3ac79cc16c61d5ebbf29d3e3582e2e7f Mon Sep 17 00:00:00 2001 From: 10a7 <10a7@users.noreply.github.com> Date: Sat, 20 Jan 2018 19:22:48 +0000 Subject: [PATCH 25/25] fix report_option failure --- egs/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/egs/main.py b/egs/main.py index b1a6519..e96f45b 100644 --- a/egs/main.py +++ b/egs/main.py @@ -111,7 +111,7 @@ def write_to_json(gprecs, prediction_table=pd.DataFrame()): console.error("write_to_json: Exception caught: " + str(e)) def master_control(args): - + report_option = False if args.generate_report is True: report_option = True