-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement async versions of long-running methods #657
Comments
Things that stand out as must be coroutine API are:
We'll need to deal with computed properties as well, or potentially not support them in the async portion of the API. eg: New APIs will need to be created to expose some sort of callback based API for filters. |
to put a stake in the ground and get the blood boiling, below a few approaches (unoptimized, exploratory in scope) to async in the context of batch processing. without re-writing the providers, concurrent.futures is the best (performance) bet. however, the GIL doesn't care where and how threads are spawned and generally, there is decreasing performance at an increasing rate as threads are added beyond some "golden' number; on smaller aws ec2 t-type instances that seems to be around 30 to 50 threads, on my old desktop it's about 120 threads, and on my laptop about a 150 threads. so there is a contextual aspect to this. the aiohttp example is a proxy as to where a re-written http provider method might fall performance-wise compared to "wrapped" existing (http) provider methods. i haven't done any serious profiling but the ranking seems about right, although the rpc methods used are too simple to make any serious claims. as you can see, i had to use new_event_loop(s), which is pretty much inevitable without an overarching event loop policy in place. so one aspect of async to consider is to spawn one event_loop as part of web3.async . in addition to re-writing async core methods, such as the providers, we'd have to consider event loop registration and de-registration for various tasks. sort of a pluggable loop system. #!/usr/bin/env python3
# -*- coding: utf8 -*-
# import os, sys
import aiohttp
import asyncio
import datetime
import random
import ujson
import time
import typing # noqa F401
from concurrent.futures import ProcessPoolExecutor
from collections import deque
# from functools import Iterator
from typing import Any, Union
from web3 import Web3, HTTPProvider, IPCProvider, WebsocketProvider
class BatchHandlerThreadPool:
''' wrap existing providers '''
provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}
def __init__(self, uri: str='127.0.0.1:8545', provider_type: str='http',
chunk_size: int = 50, max_pool_size=10):
self.uri = uri
self.provider = self._check_providers(provider_type)
self.chunk_size = chunk_size
self.max_pool_size = max_pool_size
def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
if not provider_type.lower() in BatchHandlerThreadPool.provider_map.keys():
msg = 'invalid provider type {} requested. Valid types are {}.'
raise TypeError(msg.format(provider_type, BatchHandlerThreadPool.provider_map.keys()))
else:
return BatchHandlerThreadPool.provider_map[provider_type]
def make_requests(self, job: dict) -> tuple:
''' '''
try:
w3 = Web3(self.provider(self.uri))
m = job['method']
p = job['params']
res = w3.providers[0].make_request(m, p)
result = (job, res, '')
except Exception as exc:
print("exc: ", exc)
result = (job, '', exc)
return result
def chunked_payload(self, payload: list) -> list:
''' '''
for i in range(0, len(payload), self.chunk_size):
yield payload[i: i + self.chunk_size]
def run_batch(self, payload: list, callback_fn: Any):
''' '''
executor = ProcessPoolExecutor(max_workers=self.max_pool_size)
for jobs in self.chunked_payload(payload):
futures = [executor.submit(self.make_requests, job) for job in jobs]
# [future.add_done_callback(callback_fn) for future in futures]
# keep things in line with "generic" client
[callback_fn(future.result()) for future in futures]
class BatchHandlerTasks:
''' wrap existing providers '''
provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}
def __init__(self, uri: str='127.0.0.1:8545', chunk_size: int=50, provider_type: str='http'):
self.uri = uri
self.provider = self._check_providers(provider_type)
self.chunk_size = chunk_size
def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
if not provider_type.lower() in BatchHandlerTasks.provider_map.keys():
msg = 'invalid provider type {} requested. Valid types are {}.'
raise TypeError(msg.format(provider_type, BatchHandlerTasks.provider_map.keys()))
else:
return BatchHandlerTasks.provider_map[provider_type]
async def make_request(self, job: dict) -> tuple:
''' '''
provider = Web3(self.provider(self.uri)).providers[0]
try:
m = job['method']
p = job['params']
res = provider.make_request(m, p)
result = (job, res, '')
except Exception as exc:
result = (job, '', exc)
return result
async def req_wrapper(self, job, callback_fn):
result = await self.make_request(job)
await callback_fn(result)
def chunked_payload(self, payload: list) -> list:
''' '''
for i in range(0, len(payload), self.chunk_size):
yield payload[i: i + self.chunk_size]
def run_batch(self, payload: list, callback_fn: Any):
''' '''
batch_loop = asyncio.new_event_loop()
asyncio.set_event_loop(batch_loop)
for jobs in self.chunked_payload(payload):
tasks = [self.req_wrapper(job, callback_fn) for job in jobs]
batch_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
batch_loop.close()
class BatchHandlerAioHttpTasks:
''' use aiohttp as a proxy for native asyncio providers (streams for ipc) '''
provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}
def __init__(self, uri: str='127.0.0.1:8545', chunk_size: int=50, provider_type: str='http'):
self.uri = uri
self.provider = self._check_providers(provider_type)
self.chunk_size = chunk_size
def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
if not provider_type.lower() in BatchHandlerTasks.provider_map.keys():
msg = 'invalid provider type {} requested. Valid types are {}.'
raise TypeError(msg.format(provider_type, BatchHandlerTasks.provider_map.keys()))
else:
return BatchHandlerTasks.provider_map[provider_type]
async def make_requests(self, job: dict, callback_fn: Any):
''' '''
_rpc = {"jsonrpc":"2.0", "id": 1001}
payload = {**_rpc, **job}
async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session:
res = await session.post(self.uri, json=payload)
try:
if res.status == 200:
result = (job, await res.json(), '')
else:
result = (job, '', aiohttp.errors.HttpProcessingError)
except Exception as exc:
result = (job, '', exc)
await callback_fn(result)
def chunked_payload(self, payload: list) -> list:
''' '''
for i in range(0, len(payload), self.chunk_size):
yield payload[i: i + self.chunk_size]
def run_batch(self, payload: list, callback_fn: Any):
''' '''
batch_loop = asyncio.new_event_loop()
asyncio.set_event_loop(batch_loop)
for jobs in self.chunked_payload(payload):
tasks = [self.make_requests(job, callback_fn) for job in jobs]
batch_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
batch_loop.close()
def illustrative_async_client(batch_hander_cls, batch_q: deque, chunk_size: int, batch_multiplier: int = 1):
def callback_fn(result):
batch_q.append(result)
uri = 'https://ropsten.infura.io'
provider_type = 'http'
payload = []
payload.extend([{'method': 'web3_clientVersion', 'params':[]}] * batch_multiplier)
payload.extend([{'method': 'net_version', 'params':[]}] * batch_multiplier)
random.shuffle(payload)
batch_handler = batch_hander_cls(uri=uri, provider_type=provider_type, chunk_size=chunk_size)
dt_start = datetime.datetime.utcnow()
batch_handler.run_batch(payload, callback_fn)
while len(batch_q) < len(payload):
time.sleep(0.1)
good_jobs, exc_jobs = [], []
for res in batch_q:
if res[1]:
good_jobs.append(res)
else:
exc_jobs.append(res)
dt_end = datetime.datetime.utcnow()
res_vals = (batch_hander_cls.__name__, len(payload), len(good_jobs), len(exc_jobs),
(dt_end - dt_start).total_seconds())
msg = '{} processed {} jobs, {} successes and {} failures,'
msg += ' in {} seconds.'
print(msg.format(*res_vals))
if __name__ == '__main__':
n = 100
chunk_size = 15
for batch_cls in [BatchHandlerThreadPool, BatchHandlerAioHttpTasks, BatchHandlerTasks]:
illustrative_async_client(batch_cls, deque(), chunk_size, n) # wouldn't put to much stock into the actual numbers other than maybe ranking
(web3-4.2.1) 93501@iMac27 ~/devcloud/web3-async-batch/src $ python3 multiple_async_batcher.py
BatchHandlerThreadPool processed 200 jobs, 200 successes and 0 failures, in 3.489139 seconds.
BatchHandlerAioHttpTasks processed 200 jobs, 200 successes and 0 failures, in 9.77327 seconds.
BatchHandlerTasks processed 200 jobs, 200 successes and 0 failures, in 17.100553 seconds.
(web3-4.2.1) 93501@iMac27 ~/devcloud/web3-async-batch/src $ i've messed around a bit with asyncio stream for ipc provider examples but ran out of time. writing async versions of the web3 http and ipc providers, respectively, doesn't seem all that challenging (did i just write that? nooooo) but in the end, it depends on the general approach. finally, there are performance improvements to be had going outside of the standard asyncio module. using uvloop, for example, makes a significant difference speeding up the aiohttp-based example (by about 3 seconds; i didn't see much, if any, improvements in the "wrapped" HTTPProvider). torch it, shred it, tear it apart. |
@carver is the assumptions that the (async) users will handle the event loop correct? and, as a corollary, that the web3.async namespace/API is comprised of a collection of awaitable primitives, e.g., async providers? |
@boneyard93501 Correct on both counts. Clarification on how I think our providers should work. Rather than having sync and async providers, I think that we should have the standard provider API be class BaseProvider:
def make_request(...):
# standard blocking request fn
async def make_async_request(...):
# awaitable request fn. The |
Closing, tracking async in Issue #1413 |
What was wrong?
Some async callers want
asyncio
versions of methods available.Initial discussion on #574
How it can be implemented.
TODO
The text was updated successfully, but these errors were encountered: