Skip to content

Commit

Permalink
Acceptors performance (#767)
Browse files Browse the repository at this point in the history
* Use threads for delegation. Now `run_once` lock before `accept` not `select`

* Add support to use master proxy within proxy pool plugin.  When used, proxy pool plugin will be a no-op for the master node

* Fix acceptor tests now that mask is being used

* Use `cached_property` for web server routes

* Use `select(timeout=1)` otherwise acceptor wont join if total blocking

* mypy, flake, doc spell fixes

* R0205: Class `cached_property` inherits from object, can be safely removed from bases in python3 (useless-object-inheritance)
  • Loading branch information
abhinavsingh authored Nov 20, 2021
1 parent 27f8503 commit 8fdddfd
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 66 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ dist
build

proxy/public
profile.svg
3 changes: 1 addition & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"editor.rulers": [100],
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.formatOnSaveMode": "modifications",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
Expand All @@ -20,7 +19,7 @@
"typescript.preferences.quoteStyle": "single",
"[python]": {
"editor.wordBasedSuggestions": true,
"editor.defaultFormatter": "rvest.vs-code-prettier-eslint"
"editor.defaultFormatter": null
},
"python.testing.unittestEnabled": false,
"python.testing.autoTestDiscoverOnSaveEnabled": true,
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ lib-doc:
$(OPEN) .tox/build-docs/docs_out/index.html

lib-coverage:
pytest --cov=proxy --cov=tests --cov-report=html tests/
pytest --cov=proxy --cov=tests --cov-report=html tests/ && \
$(OPEN) htmlcov/index.html

lib-profile:
Expand All @@ -137,7 +137,7 @@ lib-profile:
--disable-http-proxy \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--log-file /tmp/proxy.log
--log-file /dev/null

devtools:
pushd dashboard && npm run devtools && popd
Expand Down
2 changes: 1 addition & 1 deletion SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
Follow these steps:

1. Start by [emailing developers](mailto:[email protected])
2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose)
2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose) without disclosure about the vulnerability itself.
3. [Pull requests](https://github.com/abhinavsingh/proxy.py/pulls) are always welcome
69 changes: 69 additions & 0 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""
import sys
import ssl
import time
import socket
import logging
import functools
Expand Down Expand Up @@ -290,3 +291,71 @@ def set_open_file_limit(soft_limit: int) -> None:
logger.debug(
'Open file soft limit set to %d', soft_limit,
)


class cached_property:
"""Decorator for read-only properties evaluated only once within TTL period.
It can be used to create a cached property like this::
import random
# the class containing the property must be a new-style class
class MyClass:
# create property whose value is cached for ten minutes
@cached_property(ttl=600)
def randint(self):
# will only be evaluated every 10 min. at maximum.
return random.randint(0, 100)
The value is cached in the '_cached_properties' attribute of the object instance that
has the property getter method wrapped by this decorator. The '_cached_properties'
attribute value is a dictionary which has a key for every property of the
object which is wrapped by this decorator. Each entry in the cache is
created only when the property is accessed for the first time and is a
two-element tuple with the last computed property value and the last time
it was updated in seconds since the epoch.
The default time-to-live (TTL) is 300 seconds (5 minutes). Set the TTL to
zero for the cached value to never expire.
To expire a cached property value manually just do::
del instance._cached_properties[<property name>]
Adopted from https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties
© 2011 Christopher Arndt, MIT License.
NOTE: We need this function only because Python in-built are only available
for 3.8+. Hence, we must get rid of this function once proxy.py no longer
support version older than 3.8.
.. spelling::
getter
Arndt
"""

def __init__(self, ttl: float = 300.0):
self.ttl = ttl

def __call__(self, fget: Any, doc: Any = None) -> 'cached_property':
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
return self

def __get__(self, inst: Any, owner: Any) -> Any:
now = time.time()
try:
value, last_update = inst._cached_properties[self.__name__]
if self.ttl > 0 and now - last_update > self.ttl: # noqa: WPS333
raise AttributeError
except (KeyError, AttributeError):
value = self.fget(inst)
try:
cache = inst._cached_properties
except AttributeError:
cache, inst._cached_properties = {}, {}
finally:
cache[self.__name__] = (value, now)
return value
47 changes: 30 additions & 17 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import socket
import logging
import argparse
import threading
import selectors
import multiprocessing
import multiprocessing.synchronize
Expand All @@ -29,7 +30,6 @@

from ..event import EventQueue

from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT
from ...common.utils import is_threadless
from ...common.logger import Logger

Expand Down Expand Up @@ -87,8 +87,7 @@ def __init__(
self.event_queue = event_queue
# Index assigned by `AcceptorPool`
self.idd = idd
# Lock shared by all acceptor processes
# to avoid concurrent accept over server socket
# Mutex used for synchronization with acceptors
self.lock = lock
# Queue over which server socket fd is received on start-up
self.fd_queue: connection.Connection = fd_queue
Expand All @@ -106,14 +105,25 @@ def __init__(
self._total: int = 0

def run_once(self) -> None:
with self.lock:
assert self.selector and self.sock
events = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT)
if self.selector is not None:
events = self.selector.select(timeout=1)
if len(events) == 0:
return
conn, addr = self.sock.accept()
addr = None if addr == '' else addr
self._work(conn, addr)
locked = False
try:
if self.lock.acquire(block=False):
locked = True
for _, mask in events:
if mask & selectors.EVENT_READ:
if self.sock is not None:
conn, addr = self.sock.accept()
addr = None if addr == '' else addr
self._work(conn, addr)
except BlockingIOError:
pass
finally:
if locked:
self.lock.release()

def run(self) -> None:
Logger.setup(
Expand All @@ -134,7 +144,6 @@ def run(self) -> None:
try:
self.selector.register(self.sock, selectors.EVENT_READ)
while not self.running.is_set():
# logger.debug('Looking for new work')
self.run_once()
except KeyboardInterrupt:
pass
Expand All @@ -151,14 +160,18 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
# By default all acceptors will start sending work to
# 1st workers. To randomize, we offset index by idd.
index = (self._total + self.idd) % self.flags.num_workers
ThreadlessPool.delegate(
self.executor_pids[index],
self.executor_queues[index],
self.executor_locks[index],
conn,
addr,
self.flags.unix_socket_path,
thread = threading.Thread(
target=ThreadlessPool.delegate,
args=(
self.executor_pids[index],
self.executor_queues[index],
self.executor_locks[index],
conn,
addr,
self.flags.unix_socket_path,
),
)
thread.start()
logger.debug(
'Dispatched work#{0}.{1} to worker#{2}'.format(
self.idd, self._total, index,
Expand Down
8 changes: 3 additions & 5 deletions proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
from ...common.flag import flags
from ...common.constants import DEFAULT_NUM_ACCEPTORS

# Lock shared by acceptors for
# sequential acceptance of work.
LOCK = multiprocessing.Lock()

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -88,6 +84,8 @@ def __init__(
self.acceptors: List[Acceptor] = []
# Fd queues used to share file descriptor with acceptor processes
self.fd_queues: List[connection.Connection] = []
# Internals
self.lock = multiprocessing.Lock()

def __enter__(self) -> 'AcceptorPool':
self.setup()
Expand Down Expand Up @@ -126,7 +124,7 @@ def _start(self) -> None:
idd=acceptor_id,
fd_queue=work_queue[1],
flags=self.flags,
lock=LOCK,
lock=self.lock,
event_queue=self.event_queue,
executor_queues=self.executor_queues,
executor_pids=self.executor_pids,
Expand Down
58 changes: 28 additions & 30 deletions proxy/http/server/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ...common.constants import DEFAULT_ENABLE_STATIC_SERVER, DEFAULT_ENABLE_WEB_SERVER
from ...common.constants import DEFAULT_MIN_COMPRESSION_LIMIT, DEFAULT_WEB_ACCESS_LOG_FORMAT
from ...common.utils import bytes_, text_, build_http_response, build_websocket_handshake_response
from ...common.utils import cached_property
from ...common.types import Readables, Writables
from ...common.flag import flags

Expand Down Expand Up @@ -110,11 +111,6 @@ def __init__(
self.start_time: float = time.time()
self.pipeline_request: Optional[HttpParser] = None
self.switched_protocol: Optional[int] = None
self.routes: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = {
httpProtocolTypes.HTTP: {},
httpProtocolTypes.HTTPS: {},
httpProtocolTypes.WEBSOCKET: {},
}
self.route: Optional[HttpWebServerBasePlugin] = None

self.plugins: Dict[str, HttpWebServerBasePlugin] = {}
Expand All @@ -127,8 +123,18 @@ def __init__(
self.event_queue,
)
self.plugins[instance.name()] = instance
for (protocol, route) in instance.routes():
self.routes[protocol][re.compile(route)] = instance

@cached_property(ttl=0)
def routes(self) -> Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]]:
r: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = {
httpProtocolTypes.HTTP: {},
httpProtocolTypes.HTTPS: {},
httpProtocolTypes.WEBSOCKET: {},
}
for name in self.plugins:
for (protocol, route) in self.plugins[name].routes():
r[protocol][re.compile(route)] = self.plugins[name]
return r

def encryption_enabled(self) -> bool:
return self.flags.keyfile is not None and \
Expand Down Expand Up @@ -186,42 +192,35 @@ def try_upgrade(self) -> bool:
def on_request_complete(self) -> Union[socket.socket, bool]:
if self.request.has_host():
return False

path = self.request.path or b'/'

# Routing for Http(s) requests
protocol = httpProtocolTypes.HTTPS \
if self.encryption_enabled() else \
httpProtocolTypes.HTTP
for route in self.routes[protocol]:
if route.match(text_(path)):
self.route = self.routes[protocol][route]
assert self.route
self.route.handle_request(self.request)
if self.request.has_header(b'connection') and \
self.request.header(b'connection').lower() == b'close':
return True
return False
# If a websocket route exists for the path, try upgrade
for route in self.routes[httpProtocolTypes.WEBSOCKET]:
match = route.match(text_(path))
if match:
if route.match(text_(path)):
self.route = self.routes[httpProtocolTypes.WEBSOCKET][route]

# Connection upgrade
teardown = self.try_upgrade()
if teardown:
return True

# For upgraded connections, nothing more to do
if self.switched_protocol:
# Invoke plugin.on_websocket_open
assert self.route
self.route.on_websocket_open()
return False

break

# Routing for Http(s) requests
protocol = httpProtocolTypes.HTTPS \
if self.encryption_enabled() else \
httpProtocolTypes.HTTP
for route in self.routes[protocol]:
match = route.match(text_(path))
if match:
self.route = self.routes[protocol][route]
self.route.handle_request(self.request)
if self.request.has_header(b'connection') and \
self.request.header(b'connection').lower() == b'close':
return True
return False

# No-route found, try static serving if enabled
if self.flags.enable_static_server:
path = text_(path).split('?')[0]
Expand All @@ -232,7 +231,6 @@ def on_request_complete(self) -> Union[socket.socket, bool]:
),
)
return True

# Catch all unhandled web server requests, return 404
self.client.queue(self.DEFAULT_404_RESPONSE)
return True
Expand Down
Loading

0 comments on commit 8fdddfd

Please sign in to comment.