Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Merge branch 'current' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
JahPowerBit committed Jan 3, 2015
2 parents 4672f60 + 479f7af commit 918fed2
Show file tree
Hide file tree
Showing 24 changed files with 229 additions and 175 deletions.
5 changes: 0 additions & 5 deletions api_handler.py

This file was deleted.

70 changes: 42 additions & 28 deletions counterblockd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,19 @@
import datetime
import time

from lib import config, log, blockfeed, util, module
from lib import config, log, blockfeed, util, module, database
from lib.processor import messages, caughtup, startup #to kick off processors
from lib.processor import StartUpProcessor

logger = logging.getLogger(__name__)

if __name__ == '__main__':
# Parse command-line arguments.
parser = argparse.ArgumentParser(prog='counterblockd', description='Counterwallet daemon. Works with counterpartyd')
subparsers = parser.add_subparsers(dest='action', help='the action to be taken')
parser_server = subparsers.add_parser('server', help='Run Counterblockd')

parser_dismod = subparsers.add_parser('dismod', help='Disable a module')
parser_dismod.add_argument('module_path', type=str, help='Path of module to Disable relative to Counterblockd directory')
parser_enmod = subparsers.add_parser('enmod', help='Enable a module')
parser_enmod.add_argument('module_path', type=str, help='Full Path of module to Enable relative to Counterblockd directory')
parser_listmod = subparsers.add_parser('listmod', help='Display Module Config')

#args
parser.add_argument('-V', '--version', action='version', version="counterblockd v%s" % config.VERSION)
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False, help='sets log level to DEBUG instead of WARNING')
parser.add_argument('--enmod', type=str, help='Enable a module')
parser.add_argument('--reparse', action='store_true', default=False, help='force full re-initialization of the counterblockd database')
parser.add_argument('--testnet', action='store_true', default=False, help='use Bitcoin testnet addresses and block numbers')
parser.add_argument('--data-dir', help='specify to explicitly override the directory in which to keep the config file and log file')
Expand Down Expand Up @@ -77,37 +72,56 @@
parser.add_argument('--socketio-chat-host', help='the interface on which to host the counterblockd socket.io chat API')
parser.add_argument('--socketio-chat-port', type=int, help='port on which to provide the counterblockd socket.io chat API')

if len(sys.argv) < 2: sys.argv.append('server')
if not [i for i in sys.argv if i in ('server', 'enmod', 'dismod', 'listmod')]: sys.argv.append('server')

parser.add_argument('--support-email', help='the email address where support requests should go')
parser.add_argument('--email-server', help='the email server to send support requests out from. Defaults to \'localhost\'')
args = parser.parse_args()

#actions
subparsers = parser.add_subparsers(dest='action', help='the action to be taken')
parser_server = subparsers.add_parser('server', help='Run Counterblockd')
parser_enmod = subparsers.add_parser('enmod', help='Enable a module')
parser_enmod.add_argument('module_path', type=str, help='Full Path of module to Enable relative to Counterblockd directory')
parser_dismod = subparsers.add_parser('dismod', help='Disable a module')
parser_dismod.add_argument('module_path', type=str, help='Path of module to Disable relative to Counterblockd directory')
parser_listmod = subparsers.add_parser('listmod', help='Display Module Config')
parser_rollback = subparsers.add_parser('rollback', help='Rollback to a specific block number')
parser_rollback.add_argument('block_index', type=int, help='Block index to roll back to')

#default to server arg
if len(sys.argv) < 2: sys.argv.append('server')
if not [i for i in sys.argv if i in ('server', 'enmod', 'dismod', 'listmod', 'rollback')]:
sys.argv.append('server')

args = parser.parse_args()

config.init(args)
log.set_up(args.verbose)

#Do Module Args Actions
if args.action == 'enmod':
module.toggle(args.module_path, True)
sys.exit(0)
if args.action == 'dismod':
module.toggle(args.module_path, False)
sys.exit(0)
if args.action == 'listmod':
module.list_all()
sys.exit(0)

#Create/update pid file
pid = str(os.getpid())
pidf = open(config.PID, 'w')
pidf.write(pid)
pidf.close()

log.set_up(args.verbose)
logging.info("counterblock Version %s starting ..." % config.VERSION)

#load any 3rd party modules
module.load_all()

#Handle arguments
if args.action == 'enmod':
module.toggle(args.module_path, True)
sys.exit(0)
elif args.action == 'dismod':
module.toggle(args.module_path, False)
sys.exit(0)
elif args.action == 'listmod':
module.list_all()
sys.exit(0)
elif args.action == 'rollback':
assert args.block_index >= 1
startup.init_mongo()
database.rollback(args.block_index)
sys.exit(0)

logger.info("counterblock Version %s starting ..." % config.VERSION)

#Run Startup Functions
StartUpProcessor.run_active_functions()
18 changes: 13 additions & 5 deletions docs/Modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ the `counterblockd base-dir`. i.e.
[LoadModule]
'lib/vendor' = True
To change the default behavior for Counterblockd modules/events, change the corresponding processor config.
To change the default behavior for ``counterblockd`` modules/events, change the corresponding processor config.
(Note: function names must be exact.)

To disable a processor:
Expand Down Expand Up @@ -53,7 +53,7 @@ To list loaded modules and processors:

Adding Custom Methods
-----------------------------------
For Adding custom methods to built in Counterblockd processors. The general syntax is:
For Adding custom methods to built in ``counterblockd`` processors. The general syntax is:

.. code-block:: python
from lib.processor import <processor_name>
Expand Down Expand Up @@ -101,19 +101,27 @@ want to run a process for every new block (but not when counterblockd is catchin
return
#Do stuff here
``StartUpProcessor`` runs once on Counterblockd startup.
``StartUpProcessor`` runs once on ``counterblockd`` startup.

.. code-block:: python
@StartUpProcessor.subscribe()
def my_db_config():
config.my_db = pymongo.Connection()['my_db']
``CaughtUpProcessor`` runs once when Counterblockd catches up to the latest Counterpartyd block.
``CaughtUpProcessor`` runs once when ``counterblockd`` catches up to the latest Counterpartyd block.

.. code-block:: python
@CaughtUpProcessor.subscribe()
def caughtUpAlert():
print('Counterblockd is now caught up to Counterpartyd!')
print('counterblockd is now caught up to Counterpartyd!')
``RollbackProcessor`` runs whenever the ``counterblockd`` database is rolled back (either due to a blockchain
reorg, or an explicit rollback command being specified to ``counterblockd`` via the command line).

.. code-block:: python
@RollbackProcessor.subscribe()
def rollbackAlert(max_block_index):
print('counterblockd block database rolled back! Anything newer than block index %i removed!' % max_block_index)
To add a method from a module to the API dispatcher:

Expand Down
1 change: 1 addition & 0 deletions lib/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

D = decimal.Decimal
decimal.getcontext().prec = 8
logger = logging.getLogger(__name__)

def round_out(num):
"""round out to 8 decimal places"""
Expand Down
37 changes: 19 additions & 18 deletions lib/blockfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from lib.processor import MessageProcessor, BlockProcessor, CaughtUpProcessor

D = decimal.Decimal
logger = logging.getLogger(__name__)

def fuzzy_is_caught_up():
"""We don't want to give users 525 errors or login errors if counterblockd/counterpartyd is in the process of
Expand All @@ -42,8 +43,8 @@ def process_cpd_blockfeed(zmq_publisher_eventfeed):
#^ set after we are caught up and start up the recurring events that depend on us being caught up with the blockchain

#enabled processor functions
logging.debug("Enabled Message Processor Functions {0}".format(MessageProcessor.active_functions()))
logging.debug("Enabled Block Processor Functions {0}".format(BlockProcessor.active_functions()))
logger.debug("Enabled Message Processor Functions {0}".format(MessageProcessor.active_functions()))
logger.debug("Enabled Block Processor Functions {0}".format(BlockProcessor.active_functions()))

def publish_mempool_tx():
"""fetch new tx from mempool"""
Expand Down Expand Up @@ -77,7 +78,7 @@ def publish_mempool_tx():
del(tx['_id'])
tx['_category'] = tx['category']
tx['_message_index'] = 'mempool'
logging.debug("Spotted mempool tx: %s" % tx)
logger.debug("Spotted mempool tx: %s" % tx)
zmq_publisher_eventfeed.send_json(tx)

def clean_mempool_tx():
Expand All @@ -87,13 +88,13 @@ def clean_mempool_tx():

def parse_message(msg):
msg_data = json.loads(msg['bindings'])
logging.debug("Received message %s: %s ..." % (msg['message_index'], msg))
logger.debug("Received message %s: %s ..." % (msg['message_index'], msg))

#out of order messages should not happen (anymore), but just to be sure
assert msg['message_index'] == config.state['last_message_index'] + 1 or config.state['last_message_index'] == -1

for function in MessageProcessor.active_functions():
logging.debug('starting {}'.format(function['function']))
logger.debug('starting {}'.format(function['function']))
cmd = function['function'](msg, msg_data) or None
#break or *return* (?) depends on whether we want config.last_message_index to be updated
if cmd == 'continue': break
Expand All @@ -111,7 +112,7 @@ def parse_block(block_data):
for msg in config.state['cur_block']['_messages']:
cmd = parse_message(msg)
if cmd == 'break': break
#logging.debug("*config.state* {}".format(config.state))
#logger.debug("*config.state* {}".format(config.state))

#Run Block Processor Functions
BlockProcessor.run_active_functions()
Expand All @@ -125,7 +126,7 @@ def parse_block(block_data):

config.state['my_latest_block'] = new_block

logging.info("Block: %i of %i [message height=%s]" % (
logger.info("Block: %i of %i [message height=%s]" % (
config.state['my_latest_block']['block_index'],
config.state['cpd_backend_block_height'] \
if config.state['cpd_backend_block_height'] else '???',
Expand All @@ -142,11 +143,11 @@ def parse_block(block_data):
or app_config[0]['db_version'] != config.DB_VERSION
or app_config[0]['running_testnet'] != config.TESTNET):
if app_config.count():
logging.warn("counterblockd database version UPDATED (from %i to %i) or testnet setting changed (from %s to %s), or REINIT forced (%s). REBUILDING FROM SCRATCH ..." % (
logger.warn("counterblockd database version UPDATED (from %i to %i) or testnet setting changed (from %s to %s), or REINIT forced (%s). REBUILDING FROM SCRATCH ..." % (
app_config[0]['db_version'], config.DB_VERSION, app_config[0]['running_testnet'],
config.TESTNET, config.REPARSE_FORCED))
else:
logging.warn("counterblockd database app_config collection doesn't exist. BUILDING FROM SCRATCH...")
logger.warn("counterblockd database app_config collection doesn't exist. BUILDING FROM SCRATCH...")
app_config = database.reset_db_state()
config.state['my_latest_block'] = config.LATEST_BLOCK_INIT
else:
Expand All @@ -155,7 +156,7 @@ def parse_block(block_data):
my_latest_block = config.mongo_db.processed_blocks.find_one(sort=[("block_index", pymongo.DESCENDING)]) or config.LATEST_BLOCK_INIT
#remove any data we have for blocks higher than this (would happen if counterblockd or mongo died
# or errored out while processing a block)
config.state['my_latest_block'] = database.prune_my_stale_blocks(my_latest_block['block_index'])
config.state['my_latest_block'] = database.rollback(my_latest_block['block_index'])

#avoid contacting counterpartyd (on reparse, to speed up)
autopilot = False
Expand All @@ -176,20 +177,20 @@ def parse_block(block_data):
or app_config['counterpartyd_running_testnet'] is None:
updatePrefs = True
elif cpd_running_info['version_major'] != app_config['counterpartyd_db_version_major']:
logging.warn("counterpartyd MAJOR DB version change (we built from %s, counterpartyd is at %s)."
logger.warn("counterpartyd MAJOR DB version change (we built from %s, counterpartyd is at %s)."
+ " Wiping our state data." % (
app_config['counterpartyd_db_version_major'], cpd_running_info['version_major']))
wipeState = True
updatePrefs = True
elif cpd_running_info['version_minor'] != app_config['counterpartyd_db_version_minor']:
logging.warn("counterpartyd MINOR DB version change (we built from %s.%s, counterpartyd is at %s.%s)."
logger.warn("counterpartyd MINOR DB version change (we built from %s.%s, counterpartyd is at %s.%s)."
+ " Wiping our state data." % (
app_config['counterpartyd_db_version_major'], app_config['counterpartyd_db_version_minor'],
cpd_running_info['version_major'], cpd_running_info['version_minor']))
wipeState = True
updatePrefs = True
elif cpd_running_info.get('running_testnet', False) != app_config['counterpartyd_running_testnet']:
logging.warn("counterpartyd testnet setting change (from %s to %s). Wiping our state data." % (
logger.warn("counterpartyd testnet setting change (from %s to %s). Wiping our state data." % (
app_config['counterpartyd_running_testnet'], cpd_running_info['running_testnet']))
wipeState = True
updatePrefs = True
Expand All @@ -209,7 +210,7 @@ def parse_block(block_data):
config.state['cpd_backend_block_height'] = cpd_running_info['bitcoin_block_count']

if not config.state['cpd_latest_block']['block_index']:
logging.warn("counterpartyd has no last processed block (probably is reparsing or was just restarted)."
logger.warn("counterpartyd has no last processed block (probably is reparsing or was just restarted)."
+ " Waiting 3 seconds before trying again...")
time.sleep(3)
continue
Expand All @@ -232,7 +233,7 @@ def parse_block(block_data):
block_data = cache.get_block_info(cur_block_index,
min(100, (config.state['cpd_latest_block']['block_index'] - config.state['my_latest_block']['block_index'])))
except Exception, e:
logging.warn(str(e) + " Waiting 3 seconds before trying again...")
logger.warn(str(e) + " Waiting 3 seconds before trying again...")
time.sleep(3)
continue

Expand All @@ -245,9 +246,9 @@ def parse_block(block_data):
elif config.state['my_latest_block']['block_index'] > config.state['cpd_latest_block']['block_index']:
# should get a reorg message. Just to be on the safe side, prune back MAX_REORG_NUM_BLOCKS blocks
# before what counterpartyd is saying if we see this
logging.error("Very odd: Ahead of counterpartyd with block indexes! Pruning back %s blocks to be safe."
logger.error("Very odd: Ahead of counterpartyd with block indexes! Pruning back %s blocks to be safe."
% config.MAX_REORG_NUM_BLOCKS)
config.state['my_latest_block'] = database.prune_my_stale_blocks(
config.state['my_latest_block'] = database.rollback(
config.state['cpd_latest_block']['block_index'] - config.MAX_REORG_NUM_BLOCKS)
else:
#...we may be caught up (to counterpartyd), but counterpartyd may not be (to the blockchain). And if it isn't, we aren't
Expand All @@ -261,7 +262,7 @@ def parse_block(block_data):
config.state['last_message_index'] = cpd_running_info['last_message_index']
if config.state['my_latest_block']['block_index'] == 0:
config.state['my_latest_block']['block_index'] = cpd_running_info['last_block']['block_index']
logging.info("Detected blocks caught up on startup. Setting last message idx to %s, current block index to %s ..." % (
logger.info("Detected blocks caught up on startup. Setting last message idx to %s, current block index to %s ..." % (
config.state['last_message_index'], config.state['my_latest_block']['block_index']))

if config.state['caught_up'] and not config.state['caught_up_started_events']:
Expand Down
12 changes: 7 additions & 5 deletions lib/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

from lib import config, util

logger = logging.getLogger(__name__)

##
## REDIS-RELATED
##
def get_redis_connection():
logging.info("Connecting to redis @ %s" % config.REDIS_CONNECT)
logger.info("Connecting to redis @ %s" % config.REDIS_CONNECT)
return redis.StrictRedis(host=config.REDIS_CONNECT, port=config.REDIS_PORT, db=config.REDIS_DATABASE)

##
Expand Down Expand Up @@ -44,7 +46,7 @@ def cached_function(*args, **kwargs):
cached_result = config.mongo_db.counterblockd_cache.find_one({'block_index': block_index, 'function': function_signature})

if not cached_result or config.TESTNET:
#logging.info("generate cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
#logger.info("generate cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
try:
result = func(*args, **kwargs)
config.mongo_db.counterblockd_cache.insert({
Expand All @@ -54,15 +56,15 @@ def cached_function(*args, **kwargs):
})
return result
except Exception, e:
logging.exception(e)
logger.exception(e)
else:
#logging.info("result from cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
#logger.info("result from cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
result = json.loads(cached_result['result'])
return result

return cached_function


def clean_block_cache(block_index):
#logging.info("clean block cache lower than {}".format(block_index))
#logger.info("clean block cache lower than {}".format(block_index))
config.mongo_db.counterblockd_cache.remove({'block_index': {'$lt': block_index}})
Loading

0 comments on commit 918fed2

Please sign in to comment.