-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
500 plugin infrastructure #502
base: devel
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import asyncio | ||
import threading | ||
import importlib | ||
import logging | ||
|
||
import dexbot.plugins | ||
from dexbot.helper import iter_namespace | ||
|
||
from bitshares import BitShares | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
class PluginInfrastructure(threading.Thread): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add one extra line above, PEP8 error. |
||
""" Run plugins as asyncio tasks | ||
|
||
:param dict config: dexbot config | ||
|
||
PluginInfrastructure class is needed to be able to run asyncio plugins while having synchronous core. After | ||
switching to asyncio-aware main thread we may continue to use all plugins without refactoring them. | ||
""" | ||
|
||
def __init__(self, config): | ||
super().__init__() | ||
|
||
self.bitshares = BitShares(node=config['node'], num_retries=-1) | ||
self.config = config | ||
self.loop = None | ||
self.need_stop = False | ||
self.plugins = [] | ||
|
||
def run(self): | ||
log.debug('Starting PluginInfrastructure thread') | ||
self.init_plugins() | ||
self.loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(self.loop) | ||
self.loop.create_task(self.run_plugins()) | ||
self.loop.create_task(self.stop_handler()) | ||
self.loop.run_forever() | ||
|
||
def init_plugins(self): | ||
""" Initialize plugin instances | ||
""" | ||
plugins = {name: importlib.import_module(name) for finder, name, ispkg in iter_namespace(dexbot.plugins)} | ||
|
||
for name, plugin in plugins.items(): | ||
self.plugins.append(plugin.Plugin(config=self.config, bitshares_instance=self.bitshares)) | ||
|
||
async def run_plugins(self): | ||
""" Run each discovered plugin by calling Plugin.main() | ||
""" | ||
# Schedule every plugin as asyncio Task; use ensure_future() for python3.6 compatibility | ||
tasks = [asyncio.ensure_future(plugin.main()) for plugin in self.plugins] | ||
try: | ||
# Wait until all plugins are finished, but catch exceptions immediately as they occure | ||
await asyncio.gather(*tasks, return_exceptions=False) | ||
except asyncio.CancelledError: | ||
# Note: task.cancel() will not propagate this exception here, so it will appear only on current task cancel | ||
log.debug('Stopping run_plugins()') | ||
except Exception: | ||
log.exception('Task finished with exception:') | ||
|
||
async def stop_handler(self): | ||
""" Watch for self.need_stop flag to cancel tasks and stop the thread | ||
|
||
With this solution it's easier to achieve correct tasks stopping. self.loop.call_soon_threadsafe() requires | ||
additional wrapping to stop tasks or catch exceptions. | ||
""" | ||
while True: | ||
if self.need_stop: | ||
log.debug('Stopping event loop') | ||
tasks = [task for task in asyncio.Task.all_tasks() if task is not asyncio.tasks.Task.current_task()] | ||
# Cancel all tasks | ||
list(map(lambda task: task.cancel(), tasks)) | ||
# Wait for tasks finish | ||
results = await asyncio.gather(*tasks, return_exceptions=True) | ||
log.debug('Finished awaiting cancelled tasks, results: {0}'.format(results)) | ||
# Stop the event loop | ||
self.loop.stop() | ||
return | ||
else: | ||
await asyncio.sleep(1) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import asyncio | ||
import logging | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class Plugin: | ||
""" Example plugin class | ||
|
||
Plugin must have main() method to run. main() is expected to be an asyncio coroutine | ||
""" | ||
|
||
def __init__(self, config=None, bitshares_instance=None): | ||
pass | ||
|
||
async def do_stuff(self): | ||
log.info('Doing some stuff') | ||
await asyncio.sleep(10) | ||
log.info('Stuff done') | ||
|
||
async def boom(self): | ||
raise Exception('Boom!') | ||
|
||
async def main(self): | ||
try: | ||
while True: | ||
await self.do_stuff() | ||
await asyncio.sleep(5) | ||
await self.boom() | ||
except asyncio.CancelledError: | ||
log.info('Stopping correctly') |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,13 @@ def new_func(ctx, *args, **kwargs): | |
# Set the root logger with basic format | ||
ch = logging.StreamHandler() | ||
ch.setFormatter(formatter1) | ||
fh = logging.FileHandler('dexbot.log') | ||
fh.setFormatter(formatter1) | ||
|
||
# Root logger also logs into stream and file with respect of cli-defined verbosity | ||
logging.getLogger("dexbot").addHandler(ch) | ||
logging.getLogger("dexbot").addHandler(fh) | ||
logging.getLogger("dexbot").setLevel(getattr(logging, verbosity.upper())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolve conflict in ui.py |
||
logging.getLogger("").handlers = [] | ||
|
||
# GrapheneAPI logging | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import copy | ||
|
||
import dexbot.errors as errors | ||
from dexbot.plugin import PluginInfrastructure | ||
from dexbot.strategies.base import StrategyBase | ||
|
||
from bitshares import BitShares | ||
|
@@ -173,6 +174,8 @@ def add_worker(self, worker_name, config): | |
self.update_notify() | ||
|
||
def run(self): | ||
self.plugins_thread = PluginInfrastructure(self.config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initialize self.plugins_thread in |
||
self.plugins_thread.start() | ||
self.init_workers(self.config) | ||
self.update_notify() | ||
self.notify.listen() | ||
|
@@ -206,6 +209,9 @@ def stop(self, worker_name=None, pause=False): | |
self.workers[worker].pause() | ||
self.workers = [] | ||
|
||
# Notify plugins to stop | ||
self.plugins_thread.need_stop = True | ||
|
||
# Update other workers | ||
if len(self.workers) > 0: | ||
self.update_notify() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works but to be more specific change to
from bitshares.bitshares import BitShares
.