Skip to content

Commit

Permalink
Pool (#694)
Browse files Browse the repository at this point in the history
* Refactor pool

* mypy fixes

* Fix import (relative)

* Add WebScraper example skeleton & ConnectionPool skeleton

* Add ConnectionPool class

* Integrate ConnectionPool with proxy server (experimental)

* Lint fixes

* Remove unused imports. TODO: Put pool behind a flag. Default to false for now

* Make ConnectionPool multiprocess safe.  Later we want to make it safe but without using locks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Remove unused imports

* Return created flag from acquire

* Guard connection pool behind --enable-conn-pool flag

* Flag belongs within connection pool class

* spelling

* self.upstream = None only for pool config

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
abhinavsingh and pre-commit-ci[bot] authored Nov 7, 2021
1 parent da23c7f commit d3cee32
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 76 deletions.
24 changes: 18 additions & 6 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Looking for `proxy.py` plugin examples? Check [proxy/plugin](https://github.com

Table of Contents
=================
* [Generic Work Acceptor and Executor](#generic-work-acceptor-and-executor)
* [WebSocket Client](#websocket-client)
* [TCP Echo Server](#tcp-echo-server)
* [TCP Echo Client](#tcp-echo-client)
Expand All @@ -14,6 +15,17 @@ Table of Contents
* [PubSub Eventing](#pubsub-eventing)
* [Https Connect Tunnel](#https-connect-tunnel)

## Generic Work Acceptor and Executor

1. Makes use of `proxy.core.AcceptorPool` and `proxy.core.Work`
2. Demonstrates how to perform generic work using `proxy.py` core.

Start `web_scraper.py` as:

```console
PYTHONPATH=. python examples/web_scraper.py
```

## WebSocket Client

1. Makes use of `proxy.http.websocket.WebsocketClient` which is built on-top of `asyncio`
Expand All @@ -22,7 +34,7 @@ Table of Contents

Start `websocket_client.py` as:

```bash
```console
PYTHONPATH=. python examples/websocket_client.py
Received b'hello' after 306 millisec
Received b'hello' after 308 millisec
Expand All @@ -44,7 +56,7 @@ Received b'hello' after 309 millisec

Start `tcp_echo_server.py` as:

```bash
```console
PYTHONPATH=. python examples/tcp_echo_server.py
Connection accepted from ('::1', 53285, 0, 0)
Connection closed by client ('::1', 53285, 0, 0)
Expand All @@ -57,7 +69,7 @@ Connection closed by client ('::1', 53285, 0, 0)

Start `tcp_echo_client.py` as:

```bash
```console
PYTHONPATH=. python examples/tcp_echo_client.py
b'hello'
b'hello'
Expand All @@ -81,7 +93,7 @@ KeyboardInterrupt

Start `ssl_echo_server.py` as:

```bash
```console
PYTHONPATH=. python examples/ssl_echo_server.py
```

Expand All @@ -92,7 +104,7 @@ Start `ssl_echo_server.py` as:

Start `ssl_echo_client.py` as:

```bash
```console
PYTHONPATH=. python examples/ssl_echo_client.py
```

Expand All @@ -107,7 +119,7 @@ Start `ssl_echo_client.py` as:

Start `pubsub_eventing.py` as:

```bash
```console
PYTHONPATH=. python examples/pubsub_eventing.py
DEBUG:proxy.core.event.subscriber:Subscribed relay sub id 5eb22010764f4d44900f41e2fb408ca6 from core events
publisher starting
Expand Down
70 changes: 70 additions & 0 deletions examples/web_scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
import socket

from typing import Dict

from proxy.proxy import Proxy
from proxy.core.acceptor import Work, AcceptorPool
from proxy.common.types import Readables, Writables


class WebScraper(Work):
"""Demonstrates how to orchestrate a generic work acceptors and executors
workflow using proxy.py core.
By default, `WebScraper` expects to receive work from a file on disk.
Each line in the file must be a URL to scrape. Received URL is scrapped
by the implementation in this class.
After scrapping, results are published to the eventing core. One or several
result subscriber can then handle the result as necessary. Currently, result
subscribers consume the scrapped response and write discovered URL in the
file on the disk. This creates a feedback loop. Allowing WebScraper to
continue endlessly.
NOTE: No loop detection is performed currently.
NOTE: File descriptor need not point to a file on disk.
Example, file descriptor can be a database connection.
For simplicity, imagine a Redis server connection handling
only PUBSUB protocol.
"""

def get_events(self) -> Dict[socket.socket, int]:
"""Return sockets and events (read or write) that we are interested in."""
return {}

def handle_events(
self,
readables: Readables,
writables: Writables,
) -> bool:
"""Handle readable and writable sockets.
Return True to shutdown work."""
return False


if __name__ == '__main__':
with AcceptorPool(
flags=Proxy.initialize(
port=12345,
num_workers=1,
threadless=True,
keyfile='https-key.pem',
certfile='https-signed-cert.pem',
),
work_klass=WebScraper,
) as pool:
while True:
time.sleep(1)
8 changes: 4 additions & 4 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ def __init__(
) -> None:
super().__init__()
self.flags = flags
# Eventing core queue
self.event_queue = event_queue
# Index assigned by `AcceptorPool`
self.idd = idd
# Lock shared by all acceptor processes
# to avoid concurrent accept over server socket
self.lock = lock
# Index assigned by `AcceptorPool`
self.idd = idd
# Queue over which server socket fd is received on start-up
self.work_queue: connection.Connection = work_queue
# Worker class
self.work_klass = work_klass
# Eventing core queue
self.event_queue = event_queue
# Selector & threadless states
self.running = multiprocessing.Event()
self.selector: Optional[selectors.DefaultSelector] = None
Expand Down
90 changes: 48 additions & 42 deletions proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
from ..event import EventQueue

from ...common.flag import flags
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME, DEFAULT_NUM_WORKERS, DEFAULT_PORT
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME
from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_PORT

logger = logging.getLogger(__name__)

# Lock shared by worker processes
# Lock shared by acceptors for
# sequential acceptance of work.
LOCK = multiprocessing.Lock()


Expand Down Expand Up @@ -61,20 +63,18 @@


class AcceptorPool:
"""AcceptorPool pre-spawns worker processes to utilize all cores available on the system.
A server socket is initialized and dispatched over a pipe to these workers.
Each worker process then concurrently accepts new client connection over
the initialized server socket.
"""AcceptorPool is a helper class which pre-spawns `Acceptor` processes
to utilize all available CPU cores for accepting new work.
A file descriptor to consume work from is shared with `Acceptor` processes
over a pipe. Each `Acceptor` process then concurrently accepts new work over
the shared file descriptor.
Example usage:
pool = AcceptorPool(flags=..., work_klass=...)
try:
pool.setup()
with AcceptorPool(flags=..., work_klass=...) as pool:
while True:
time.sleep(1)
finally:
pool.shutdown()
`work_klass` must implement `work.Work` class.
"""
Expand All @@ -84,11 +84,16 @@ def __init__(
work_klass: Type[Work], event_queue: Optional[EventQueue] = None,
) -> None:
self.flags = flags
# Eventing core queue
self.event_queue: Optional[EventQueue] = event_queue
# File descriptor to use for accepting new work
self.socket: Optional[socket.socket] = None
# Acceptor process instances
self.acceptors: List[Acceptor] = []
# Work queue used to share file descriptor with acceptor processes
self.work_queues: List[connection.Connection] = []
# Work class implementation
self.work_klass = work_klass
self.event_queue: Optional[EventQueue] = event_queue

def __enter__(self) -> 'AcceptorPool':
self.setup()
Expand All @@ -102,19 +107,43 @@ def __exit__(
) -> None:
self.shutdown()

def listen(self) -> None:
def setup(self) -> None:
"""Listen on port and setup acceptors."""
self._listen()
# Override flags.port to match the actual port
# we are listening upon. This is necessary to preserve
# the server port when `--port=0` is used.
assert self.socket
self.flags.port = self.socket.getsockname()[1]
self._start_acceptors()
# Send file descriptor to all acceptor processes.
assert self.socket is not None
for index in range(self.flags.num_workers):
send_handle(
self.work_queues[index],
self.socket.fileno(),
self.acceptors[index].pid,
)
self.work_queues[index].close()
self.socket.close()

def shutdown(self) -> None:
logger.info('Shutting down %d workers' % self.flags.num_workers)
for acceptor in self.acceptors:
acceptor.running.set()
for acceptor in self.acceptors:
acceptor.join()
logger.debug('Acceptors shutdown')

def _listen(self) -> None:
self.socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((str(self.flags.hostname), self.flags.port))
self.socket.listen(self.flags.backlog)
self.socket.setblocking(False)
# Override flags.port to match the actual port
# we are listening upon. This is necessary to preserve
# the server port when `--port=0` is used.
self.flags.port = self.socket.getsockname()[1]

def start_workers(self) -> None:
"""Start worker processes."""
def _start_acceptors(self) -> None:
"""Start acceptor processes."""
for acceptor_id in range(self.flags.num_workers):
work_queue = multiprocessing.Pipe()
acceptor = Acceptor(
Expand All @@ -134,26 +163,3 @@ def start_workers(self) -> None:
self.acceptors.append(acceptor)
self.work_queues.append(work_queue[0])
logger.info('Started %d workers' % self.flags.num_workers)

def shutdown(self) -> None:
logger.info('Shutting down %d workers' % self.flags.num_workers)
for acceptor in self.acceptors:
acceptor.running.set()
for acceptor in self.acceptors:
acceptor.join()
logger.debug('Acceptors shutdown')

def setup(self) -> None:
"""Listen on port, setup workers and pass server socket to workers."""
self.listen()
self.start_workers()
# Send server socket to all acceptor processes.
assert self.socket is not None
for index in range(self.flags.num_workers):
send_handle(
self.work_queues[index],
self.socket.fileno(),
self.acceptors[index].pid,
)
self.work_queues[index].close()
self.socket.close()
4 changes: 2 additions & 2 deletions proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from abc import abstractmethod
from typing import Dict, Any, Optional

from proxy.core.acceptor import Work
from proxy.common.types import Readables, Writables
from ...core.acceptor import Work
from ...common.types import Readables, Writables

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion proxy/core/base/tcp_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from abc import abstractmethod
import socket
import selectors

from abc import abstractmethod
from typing import Any, Optional, Dict

from ...http.parser import HttpParser, httpParserTypes
Expand Down
2 changes: 2 additions & 0 deletions proxy/core/connection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from .connection import TcpConnection, TcpConnectionUninitializedException, tcpConnectionTypes
from .client import TcpClientConnection
from .server import TcpServerConnection
from .pool import ConnectionPool

__all__ = [
'TcpConnection',
'TcpConnectionUninitializedException',
'TcpServerConnection',
'TcpClientConnection',
'tcpConnectionTypes',
'ConnectionPool',
]
2 changes: 1 addition & 1 deletion proxy/core/connection/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
self,
conn: Union[ssl.SSLSocket, socket.socket],
addr: Tuple[str, int],
):
) -> None:
super().__init__(tcpConnectionTypes.CLIENT)
self._conn: Optional[Union[ssl.SSLSocket, socket.socket]] = conn
self.addr: Tuple[str, int] = addr
Expand Down
19 changes: 16 additions & 3 deletions proxy/core/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class TcpConnection(ABC):
when reading and writing into the socket.
Implement the connection property abstract method to return
a socket connection object."""
a socket connection object.
"""

def __init__(self, tag: int):
def __init__(self, tag: int) -> None:
self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client'
self.buffer: List[memoryview] = []
self.closed: bool = False
self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client'
self._reusable: bool = False

@property
@abstractmethod
Expand Down Expand Up @@ -95,3 +97,14 @@ def flush(self) -> int:
del mv
logger.debug('flushed %d bytes to %s' % (sent, self.tag))
return sent

def is_reusable(self) -> bool:
return self._reusable

def mark_inuse(self) -> None:
self._reusable = False

def reset(self) -> None:
assert not self.closed
self._reusable = True
self.buffer = []
Loading

0 comments on commit d3cee32

Please sign in to comment.