-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Websocket provider #566
Comments
I found this absolutely hilarious for some reason.... |
I suspect we can get by with a minimal initial implementation which operates synchronously much the same way the IPC provider does? I am not well versed in websockets so would need to validate this assertion. |
I experimented with this a while ago, I'm afraid it can't be easily added. Websockets require a persistent connection and a running event loop. Here it gets tricky as async calls spread to the very top. Other than that, the API is the same. The one approach I see feasible with the current design is adding an A quick example: import json
import asyncio
import websockets
async def main():
ws = await websockets.connect('wss://mainnet.infura.io/ws')
request = dict(jsonrpc='2.0', id=1, method='eth_blockNumber', params=[])
await ws.send(json.dumps(request))
response = await ws.recv()
print(json.loads(response)['result'])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main()) |
The performance spread between HTTP and IPC is higher than 2x in my experience. Did you separately test against IPC, or are you just bundling them together after running HTTP tests? BTW, I'm not arguing against a WebSockets provider; it's valuable enough to implement if it's only faster than HTTP, for services that are only available remotely. But I am genuinely curious how it compares specifically to IPC. |
This isn't super different from IPC. A websockets implementation could probably use something similar to IPC's |
@pipermerriam @carver id love to see websockets available in web3py. in fact, it looks like infura wants to see it too is this on the roadmap soon? is it a good bounty-able issue? perhaps given the route that @pipermerriam suggests here?
|
I'm not sure if they need to be done in conjunction with each other, but this issue compliments the websockets one. Since both need |
Main issue has been updated to reflect what I think is the minimum necessary implementation. |
AIUI, the new
|
I would like to work on this. I am not a asyncio user, however I have experience with twisted. I should get started on this, this weekend. @pipermerriam @carver Would it be fine to add a third party dependency like websockets. It seems to be built on top of asyncio. |
This issue now has a funding of 0.4 ETH (263.2 USD @ $658.0/ETH) attached to it.
|
Work has been started on the 0.4 ETH (168.93 USD @ $422.33/ETH) funding by: Please work together and coordinate delivery of the issue scope. Gitcoin doesn't know enough about everyones skillsets / free time to say who should work on what, but we trust that the community is smart and well-intentioned enough to work together. As a general rule; if you start work first, youll be at the top of the above list ^^, and should have 'dibs' as long as you follow through. On the above list? Please leave a comment to let the funder (@owocki) and the other parties involved what you're working, with respect to this issue and your plans to resolve it. If you don't leave a comment, the funder may expire your submission at their discretion.
|
@voith take a look at the main issue description. It lays out some basics for how this should be implemented and tested (and suggests using the Looking forward to seeing this get started. Please get a pull request open this weekend as soon as you've started your work (we'll mark it as a work in progress). |
@voith i was looking at this at one point but then got totally distracted with connection pools (e.g., ws4py ) and autobahn's client factory. also, rust-web3 has an interesting take on channels. below a very rudimentary and self-contained (but wasteful) approach using futures. maybe it's of some value to you. #!/usr/bin/env python3
# -*- coding: utf8 -*-
import os
import json
import asyncio
import websockets
from web3.providers.base import (
JSONBaseProvider,
)
def get_default_endpoint():
return os.environ.get('WEB3_WS_PROVIDER_URI', 'ws://locahost:8546')
class WebsocketProvider(JSONBaseProvider):
# don't think we need to provide request kwargs here
loop = None
# consider the possbility of passing the loop
def __init__(self, endpoint_uri=None):
self.endpoint_uri = endpoint_uri
if self.endpoint_uri is None:
self.endpoint_uri = get_default_endpoint()
if self.loop is None:
self.loop = asyncio.get_event_loop()
super().__init__()
def __str__(self):
return "WS connection {0}".format(self.endpoint_uri)
def __del__(self):
# self.loop.close() # ha,ha think async!!
pass
def _is_ws_connected(self):
# don't need it if we're doing the eth_protocolVersion call to check
# and don't use a connection pool manager. but if we do, make it async
raise NotImplementedError()
def _close_ws_connection(self):
# don't need it if we use the context manager but if we do,
# make it async
raise NotImplementedError()
async def rpc_call(self, request_data, future):
# assert request_data
async with websockets.connect(self.endpoint_uri) as ws:
await ws.send(request_data)
result = await ws.recv()
future.set_result(result)
def make_request(self, request_data):
# def make_request(self, method, params):
# request_data = self.encode_rpc_request(method, params)
loop = self.loop
future = asyncio.Future()
asyncio.ensure_future(self.rpc_call(request_data, future))
loop.run_until_complete(future)
result = future.result()
return json.loads(result)
def lame_test(net_id='3', proto_ver='63', syncing=False):
call_q = [({"jsonrpc": "2.0", "method": "eth_protocolVersion", "params": [], "id": 1}, proto_ver),
({"jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id": 2}, syncing),
({"jsonrpc": "2.0", "method": "net_version", "params": [], "id": 3}, net_id)
]
uri = 'ws://127.0.0.1:8546'
for payload in call_q:
WSP = WebsocketProvider(endpoint_uri=uri)
assert WSP.make_request(json.dumps(payload[0]))['result'] == payload[1]
WSP = WebsocketProvider(endpoint_uri=uri)
for payload in call_q:
assert WSP.make_request(json.dumps(payload[0]))['result'] == payload[1]
if __name__ == '__main__':
lame_test() the handling of the event loop(s) and the possible reuse/keep alive of the connections should probably viewed in light of the overall async strategy. regardless, a ws connection pool is imo the way to go. bon chance. |
STATUS UPDATE:
@pipermerriam I tried looking if similar tests have been written for other providers but I couldn't find any. If such tests exists, can you point me to the code? I've had a look at the unit tests for the providers and also the UPDATE: Found the tests |
Also, I have tested some basic methods manually.
This is the response returned:
|
never mind, found the tests! |
@voith You should get a pull request open and we can start from there. |
Also to use rinkeby, you'll need this: http://web3py.readthedocs.io/en/latest/middleware.html#geth-style-proof-of-authority |
@pipermerriam Thanks! The middleware fixed the issue. |
Our preference is for you to open the PR as soon as you have code. Pull requests do not need to be polished or done. This can save you a lot of time since that allows us to have a look at your implementation and give feedback/direction earlier on. |
I'm posting the design decision held between @pipermerriam and @gsalgado on gitter, to keep track. |
while trying to address #708 (comment), I came up with the following solution: import asyncio
import websockets
from threading import (
current_thread,
Thread,
)
from web3.providers.base import (
JSONBaseProvider,
)
THREAD_POLLING_INTERVAL = 2 # secs
async def _stop_loop(loop, parent_thread):
while parent_thread.is_alive():
await asyncio.sleep(THREAD_POLLING_INTERVAL)
else:
loop.stop()
def _start_event_loop(loop, parent_thread):
asyncio.set_event_loop(loop)
asyncio.run_coroutine_threadsafe(_stop_loop(loop, parent_thread), loop)
loop.run_forever()
loop.close()
def _get_threaded_loop():
new_loop = asyncio.new_event_loop()
thread_loop = Thread(target=_start_event_loop, args=(new_loop, current_thread()))
thread_loop.start()
return new_loop
class WebsocketProvider(JSONBaseProvider):
_loop = None
def __init__(self, endpoint_uri=None):
if self._loop is None:
self._loop = _get_threaded_loop()
... The solution has been inspired by: https://stackoverflow.com/a/18643557/3526700 Since the event_loop runs in a separate thread, the problem I was having was to figure out a clean way to exit the program whenever the main thread would exit. The main culprit here is @pipermerriam @carver Can you think of a better solution? Or is this good for now. |
This seems okay, probably. I also wish for there to be an easier way. But sometimes with threading, it's tough to find one. Just to ask the question. Could we just get away with putting the asyncio loop on a |
I am not an expert in threading, however from what I've understood daemon threads are used to run background tasks that are only useful when the main program runs. In this case, |
when i looked at it, approaching it from the perspective of self-contained event loops seemed to make the most sense given the existing sync architecture. from a think-async perspective not optimal but given the code base, it's a pretty good approach. wrapping each loop in it's own thread, as suggested by @carver, gives you also the basis for the socket registration pool i eluded to earlier and a guaranteed cleanup on exit. fundamentally, it's a gevent approach without the covers. |
@voith while demonized cleans up threads, some of the (IO) resources may be locked for a bit. generally not for very long but if it's a concern, using threading.Event in a graceful exit method with non-demonized threads would alleviate that issue. you probably wan to cascade closing each ws con for thread alive. |
@boneyard93501 Thanks for weighing in!
You're absolutely right. I wasn't having any problems till now as I was creating and closing a new websocket connection for every request. The moment I implemented a Persitent connection, I'm getting some spurious errors. class PersistentWebSocket:
def __init__(self, endpoint_uri, loop):
self.ws = None
self.endpoint_uri = endpoint_uri
self.loop = loop
def __del__(self):
if self.ws:
self.loop.run_until_complete(self.ws.close())
async def __aenter__(self):
if self.ws is None:
self.ws = await websockets.connect(uri=self.endpoint_uri, loop=self.loop)
return self.ws
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
try:
self.ws.close()
except Exception:
pass
self.ws = None Now I get this error:
The problem here is that the event loop is stopped before I also tried to wait for the pending tasks to finish: async def _stop_loop(loop, parent_thread):
while parent_thread.is_alive():
await asyncio.sleep(THREAD_POLLING_INTERVAL)
else:
all_tasks = asyncio.Task.all_tasks(loop)
current_task = asyncio.Task.current_task(loop)
pending_tasks = all_tasks - {current_task}
loop.run_until_complete(asyncio.gather(*pending_tasks))
loop.stop() But this waits forever. garabage collection doesn't work as I expected it to work. However, I don't get this problem with
|
Looks like using |
@voith @pipermerriam i don't think you want to treat ws provider analogous to httpprovider, which means reuse and requires some additional management at the request manager level. otherwise you may end up not only blocking a lot of ports on the clients-side but possibly bring the ws server on the node to grinding halt. not likely but plausible especially in a multi-user per node situation. let's say you have a (global) deque, called ws_registry, possibly pre-seeded with, say, five ws connections at startup (as part of the Web3(WSProvider(...)) init), and the modified make_request procedure pseudo-looks like so:
you ought to get away with ws clients based on a slightly modified websocket echo client example which is reasonably clean. add ws_registry.pushleft(ws) to the on_open and ws_registry.remove(ws) to the on_close, i think you're mostly covered. it's still far from ideal but imo you end up reusing connections getting the real ws benefits. |
from what I've explored, @boneyard93501 If I understand you correctly, I think you are worried about the number of connections that will be opened up in a multi websocket provider situation and the solution you're proposing is to use a global
I don't think the |
Lets assume connection pooling is out of scope for this initial implementation. I agree that it may be an issue, but I want to get the core functionality into the main codebase before we start optimizing things. |
Will work on this tomorrow. I won't be available this weekend, so I'll try my best to finish this before Thursday. |
Work for 0.4 ETH (149.73 USD @ $374.31/ETH) has been submitted by: Submitters, please leave a comment to let the funder (@owocki) (and the other parties involved) that you've submitted you work. If you don't leave a comment, the funder may expire your submission at their discretion.
|
@carver @pipermerriam 👌 to payout? |
delegating to @carver |
Good to go 👍 |
The funding of 0.4 ETH (152.43 USD @ $381.06/ETH) attached to this issue has been approved & issued to @voith.
|
Thanks guys, I have received the bounty :) |
I used threading with Deamons at first and then I switched to celery to deploy and transact with functions async. I had no experience with asyncio so that’s the decision I chose. I know it prob doesn’t matter, I just wanted to give my input on how I solved it in our production application. |
Hi @dangell7 Good to know! |
What is wrong?
Websocket have some nice benefits over HTTP based connections to a node.
HttpProvider
in favor ofWebsocketProvider
How it can be fixed.
Whoever takes this on is going to be committing to dealing with figuring out some unknown unknowns. It's unclear at the time of writing this issue how much work adding this provider is going to require.
The naive implementation plan is as follows:
Implement a new class
web3.providers.websocket.WebsocketProvider
. This class should use thewebsockets
library linked above to establish (or reuse an existing) websocket connection to send the JSON-RPC request and get the response.Definition of Done
WebsocketProvider
alongside the other provider documentation.WebsocketProvider
including a full working example of running the event loop in a small example app.pytest-asyncio
The text was updated successfully, but these errors were encountered: