Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core Refactor: Worker, MP asyncio #538

Closed
thehapax opened this issue Apr 5, 2019 · 11 comments
Closed

Core Refactor: Worker, MP asyncio #538

thehapax opened this issue Apr 5, 2019 · 11 comments
Assignees
Labels

Comments

@thehapax
Copy link
Collaborator

thehapax commented Apr 5, 2019

Discussion on Core Refactor, originated on Telegram

Goal is to make the Worker Infrastructure for Dexbot use multiprocessing and asyncio to speed up computations on CPU and I/O.

Legacy code not used anymore

  • State machine in Base Strategy can be removed

Open issues for discussion:

  • Events in Base Strategy : This callback-based event handling needs heavy refactor, especially if the core (worker infrastructure) will use asyncio/multiprocessing/whatever
  • GUI interaction with Worker.
  • Worker Infrastructure : refactor to use multiprocessing
  • Storage, Logging
  • Strategies
  • Price and Order Engines

Interaction Diagraming and UML to be added to the TDD: Technical Design Doc

@thehapax
Copy link
Collaborator Author

thehapax commented Apr 5, 2019

@bitfag please elaborate on

This callback-based event handling needs heavy refactor, especially if the core (worker infrastructure) will use asyncio/multiprocessing/whatever

@bitphage
Copy link
Collaborator

bitphage commented Apr 6, 2019

Thoughts on architecture:

  • Split core and frontends. Core can be a standalone cli app exposing API, and different frontends (CLI / GUI / Web) can use this API to interact with core
  • Use asyncio eventloop as core basement. All core parts are logically split into asyncio.Tasks:
    • API: frontends interaction
    • Plugin infrastructure: run plugins (each plugin is separate Task, see 500 plugin infrastructure #502 )
    • WorkerInfrastructure Task: it manages bitshares subscriptions and handling events. On each incoming event, it adds this event into each worker queue object. So, workers will process all events in parallel, and no blocking will occur between incoming events, as WorkerInfrastructure doesn't need to wait while all workers will finish the processing of current event to take a next one.
    • Each worker is running as separate Task, using a coroutine run_forever() with infinity loop inside. This coroutine checks worker's queue and process events from it

What has to be figured out: currently the core (WorkerInfrastructure) is based on bitshares.Notify. On startup it subscribes to markets, blocks, accounts. bitshares.Notify uses websocket-client callbacks. So basically when something happening on the bitshares, Notify gets event, then it's propagated to all workers. Async variant of bitshares.Notify has to use websockets library which doesn't have callbacks, it uses coroutines. Because it is not yet implemented, I cannot say how exactly we will use it. Probably we will have some for loop which passes events to the workers (by adding them into queues of the workers as described above)

For multiprocessing, I don't know what are you planning exactly. If you're going to run each worker as separate process, you cannot use callbacks at all. Each worker will have to use own BitShares instance and manage subscriptions on it's own. It's just like running N dexbot instances with single worker in each.

I don't clearly see how exactly we can benefit from multiprocessing. What is it may be good for, is to push some long-running computations into separate processes, as laid out in this example https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

@thehapax
Copy link
Collaborator Author

thehapax commented Apr 11, 2019

  • Split core and frontends. Core can be a standalone cli app exposing API, and different frontends (CLI / GUI / Web) can use this API to interact with core

I agree on this aspect to split core and front ends

re: bitshares.Notify: lets wait for xeroc if he is implementing this and see what happens

As for multiprocessing, I don't think this is best way. Perhaps multithreading.

@thehapax
Copy link
Collaborator Author

State machine has been removed and merged into devel

@thehapax
Copy link
Collaborator Author

thehapax commented Apr 22, 2019

Something to consider for design.

  • Future strategies may be based in heavy computation AI/ML (as per conversation with Gabriel).
    We should think about how to make these separate processes connect with dexbot strategy framework.

@bitphage
Copy link
Collaborator

A strategy can use ProcessPoolExecutor to run heavy computations not blocking the main process. https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

@thehapax
Copy link
Collaborator Author

thehapax commented May 9, 2019

Ok sounds good. I'll take a look into it

@MarkoPaasila
Copy link
Collaborator

MarkoPaasila commented May 29, 2019

I like the queue idea for using the pybitshares instance. That will allow for maximizing throughput of one bitshares-instance. Would a step forward be to have pybitshares operate from a queue (no async yet), and separate each worker into a thread of it's own? That way workers would have a break while waiting for data, and give other workers the opportunity to add their questions to the queue. Then the earlier workers would resume. I would imagine that could speed up stuff by a factor of two, and async would yield another x10 later on.

How hard would it be to make an API to separate gui from core? There's not terribly much communication going on there I guess. What kind of API would be best for enabling different frontends? A web interface over LAN or ssh would be a nice possibility, and a Telegram control too :-).

@bitphage
Copy link
Collaborator

bitphage commented May 29, 2019

How hard would it be to make an API to separate gui from core?

#582

Would a step forward be to have pybitshares operate from a queue

Not sure it will worth efforts. Here is asyncio progress https://github.com/Codaone/DEXBot/projects/14

@thehapax
Copy link
Collaborator Author

thehapax commented May 29, 2019

Would a step forward be to have pybitshares operate from a queue (no async yet), and separate each worker into a thread of it's own?

Hi @MarkoPaasila - This is the same thing I thought would be a temporary solution. I tried doing this last week using a threadpool (which is an extension of a thread queue) and no async, it does speed up the output but it has thread collisions from the single shared bitshares instance. What's interesting is that the thread collisions are not consistent so sometimes it takes several attempts in testing for the collisions to show up.

Also tried spawning multiple bitshares instances and running them in separate processes instead of threads but this also has collision problems. see #589 (comment)

The first attempt to creating multiple Bitshares() Instance, gave error is in appendSigner, where the wallet is locked and the other threads try to seize the lock at the same time. It doesn't happen every time but it can.

  File "/Users/octomatic/Library/Python/3.7/lib/python/site-packages/graphenelib-1.1.18-py3.7.egg/graphenecommon/transactionbuilder.py", line 285, in appendSigner
    raise WalletLocked()

In other cases, for example from within SO, grapheneapi.exceptions.RPCError: Assert Exception: now <= trx.expiration:

2019-05-27 17:40:55,154 - SO-ETH using account octet3 on SPARKDEX.ETH/OPEN.ETH - ERROR - Got exception during broadcasting trx:
Traceback (most recent call last):
  File "/Users/octomatic/Library/Python/3.7/lib/python/site-packages/graphenelib-1.1.18-py3.7.egg/grapheneapi/api.py", line 168, in func
    r = func(*args, **kwargs)
  File "/Users/octomatic/Library/Python/3.7/lib/python/site-packages/graphenelib-1.1.18-py3.7.egg/grapheneapi/rpc.py", line 138, in method
    message = self.parse_response(r)
  File "/Users/octomatic/Library/Python/3.7/lib/python/site-packages/graphenelib-1.1.18-py3.7.egg/grapheneapi/rpc.py", line 106, in parse_response
    raise RPCError(ret["error"]["message"])
grapheneapi.exceptions.RPCError: Assert Exception: now <= trx.expiration: 

which is linked to graphenelib transactionbuilder.py

        try:
            if self.blockchain.blocking:
                ret = self.blockchain.rpc.broadcast_transaction_synchronous(
                    ret, api="network_broadcast"
                )
                ret.update(**ret.get("trx", {}))
            else:
                self.blockchain.rpc.broadcast_transaction(ret, api="network_broadcast")

@thehapax
Copy link
Collaborator Author

see #589 , closing to avoid duplicate confusion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants