From 32a640fac0baa9d204144e1d2d381a710e5d593c Mon Sep 17 00:00:00 2001 From: DevAlone Date: Sat, 6 Oct 2018 23:56:08 +0300 Subject: [PATCH] migrate to python3.6 --- checkers/base_checker.py | 2 +- collectors/abstract_collector.py | 33 +++++++++++++++------------ processor.py | 38 +++++++++++++++----------------- proxy_py/_settings.py | 10 ++++----- requirements.txt | 4 ++-- 5 files changed, 45 insertions(+), 42 deletions(-) diff --git a/checkers/base_checker.py b/checkers/base_checker.py index f63c67d..f405448 100644 --- a/checkers/base_checker.py +++ b/checkers/base_checker.py @@ -2,10 +2,10 @@ from proxy_py import settings import ssl +import aiohttp import aiosocks import asyncio import async_requests -import aiohttp class CheckerResult: diff --git a/collectors/abstract_collector.py b/collectors/abstract_collector.py index 96b64fe..446bf4e 100644 --- a/collectors/abstract_collector.py +++ b/collectors/abstract_collector.py @@ -1,4 +1,6 @@ # TODO: add wrapper for doing requests and saving its cookies and UserAgent +import asyncio + from proxy_py import settings import json @@ -35,20 +37,23 @@ async def collect(self): return [] async def _collect(self): - """Do not use! It is called on collector's processing automatically""" - - # TODO: uncomment when python 3.6 comes to ubuntu lts - # i = 0 - # async for proxy in self.collect(): - # if i > settings.COLLECTOR_MAXIMUM_NUMBER_OF_PROXIES_PER_REQUEST: - # break - - # yield proxy - # i += 1 - proxies = list(await self.collect()) - proxies = proxies[:settings.COLLECTOR_MAXIMUM_NUMBER_OF_PROXIES_PER_REQUEST] - self.last_processing_proxies_count = len(proxies) - return proxies + """Do not call yourself! It is called on collector's processing automatically""" + collect = self.collect() + if asyncio.iscoroutine(collect): + async def wrapper(f): + for item in (await f): + yield item + collect = wrapper(collect) + + i = 0 + async for proxy in collect: + if i > settings.COLLECTOR_MAXIMUM_NUMBER_OF_PROXIES_PER_REQUEST: + break + + yield proxy + i += 1 + + self.last_processing_proxies_count = i async def load_state(self, state: models.CollectorState): """ diff --git a/processor.py b/processor.py index c554cf8..77dc480 100644 --- a/processor.py +++ b/processor.py @@ -164,17 +164,28 @@ async def process_collector_of_state(self, collector_state): self.logger.debug( "start processing collector of type \"{}\"".format(type(collector)) ) - proxies = await collector._collect() - if proxies: - self.logger.debug( - "got {} proxies from collector of type \"{}\"".format(len(proxies), type(collector)) - ) - await self.process_raw_proxies(proxies, collector_state.id) - else: + tasks = [] + number_of_proxies = 0 + async for proxy in collector._collect(): + number_of_proxies += 1 + tasks.append(self.process_raw_proxy(proxy, collector_state.id)) + + if len(tasks) > settings.NUMBER_OF_CONCURRENT_TASKS: + await asyncio.gather(*tasks) + tasks.clear() + + if tasks: + await asyncio.gather(*tasks) + + if number_of_proxies == 0: self.collectors_logger.warning( "got 0 proxies from collector of type \"{}\"".format(type(collector)) ) + else: + self.collectors_logger.info( + f"got {number_of_proxies} proxies from collector of type \"{type(collector)}\"" + ) except KeyboardInterrupt as ex: raise ex except BaseException as ex: @@ -187,19 +198,6 @@ async def process_collector_of_state(self, collector_state): # TODO: new proxies count await collectors_list.save_collector(collector_state) - async def process_raw_proxies(self, proxies, collector_id): - tasks = [] - - for proxy in proxies: - # TODO: refactor it - tasks.append(self.process_raw_proxy(proxy, collector_id)) - if len(tasks) > settings.NUMBER_OF_CONCURRENT_TASKS: - await asyncio.gather(*tasks) - tasks.clear() - - if tasks: - await asyncio.gather(*tasks) - async def process_raw_proxy(self, proxy, collector_id): self.logger.debug("processing raw proxy \"{}\"".format(proxy)) diff --git a/proxy_py/_settings.py b/proxy_py/_settings.py index 87330d5..37ef6f1 100644 --- a/proxy_py/_settings.py +++ b/proxy_py/_settings.py @@ -11,9 +11,9 @@ DATABASE_CONNECTION_ARGS = () DATABASE_CONNECTION_KWARGS = { - 'database': 'test', - 'user': 'test', - 'password': 'test', + 'database': 'proxy_py', + 'user': 'proxy_py', + 'password': 'proxy_py', 'max_connections': 20, } @@ -32,11 +32,11 @@ # 'local/collectors', # use to add your own collectors ] -NUMBER_OF_CONCURRENT_TASKS = 64 +NUMBER_OF_CONCURRENT_TASKS = 128 # makes aiohttp to not send more # than this number of simultaneous requests # works by common connector -NUMBER_OF_SIMULTANEOUS_REQUESTS = 64 +NUMBER_OF_SIMULTANEOUS_REQUESTS = 128 # the same, but per host NUMBER_OF_SIMULTANEOUS_REQUESTS_PER_HOST = NUMBER_OF_SIMULTANEOUS_REQUESTS diff --git a/requirements.txt b/requirements.txt index e382f7e..b5ef41e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ aiohttp==2.3.10 -aiosocks==0.2.5 +aiohttp-jinja2==0.16.0 +aiosocks lxml fake-useragent -aiohttp_jinja2 jinja2 peewee-async aiopg