Skip to content

Commit

Permalink
Channel and connection refactor (#35)
Browse files Browse the repository at this point in the history
* should be possible to configure ssl on dependencyprovider with kwarg

* bump version number

* refactor to DRY out client code

* checkpoint: refactor client connection into channel

* checkpoint: client connection pool with reconnection

* DRY client out again with common spawn thread

* also move ServerConnectionManager to connection module

* handle connection terminated events

* static fixes

* simplequeue only available since 3.7

* tidyup

* add server channel to mirror client channel refactor

* static fixes

* unused import

* channel and connection tidyup

- use timeout for initial client connection
- use timeout for send/recv on client and server connection sockets
- wait for connections to stop before stopping server channel
 (via new serverconnectionpool)
- use context manager to track connection dead-ness

* whitespace

* target_with_callback should raise if target raises

* bump to 1.2.0

* remove ill-conceived target_with_callback

* add logging when connections are terminated

* minor tweak: skip tests when server is not compatible

(we were running the test with a compatible server and the wrong label)

* better tweak: override the parametrized fixture rather than skipping

* have to stop dp clients during fixture teardown

otherwise they try to reconnect as the server is being stopped and
produce a traceback in the logs

* explicitly handle ProtocolError to avoid later traceback

* static fixes

* bump version

* bug: thread should exit after timeout reached

* small tweak to unit test

generate the error as part of the stream rather than afterwards, so
that close is only called once

* close the timeout thread early if all streams closed

* close the timeout thread early if all streams closed

(entrypoint edition)

* remove receive streams from connection state when closed

* slightly nicer thread idents

* more robust client reconnection

- ensure all streams are closed on exit
- simpler error handling, catching all exceptions
- nicer thread idents

* static fixes

* bump version

* drop py3.5

* add coverage to makefile

* test on py38 and py39

* static fixes

* improved explanation in doctring
  • Loading branch information
mattbennett authored Feb 4, 2021
1 parent fe51b53 commit d4d13dc
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 366 deletions.
11 changes: 11 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[run]
concurrency = eventlet
branch = 0
source =
nameko_grpc
test
omit =
test/grpc_indirect*.py
[report]
show_missing = true
fail_under = 100
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ __pycache__
.tox
*_pb2.py
*_pb2_grpc.py
.coverage
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@ matrix:
- stage: static
python: 3.6
env: TOX_ENV=static
- stage: test
python: 3.5
env: TOX_ENV=py35-test
- stage: test
python: 3.6
env: TOX_ENV=py36-test
- stage: test
python: 3.7
# dist: xenial
sudo: true
env: TOX_ENV=py37-test
- stage: test
python: 3.8
env: TOX_ENV=py38-test
- stage: test
python: 3.9
env: TOX_ENV=py39-test
- stage: deploy
script: skip
deploy:
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ static:
pre-commit run --all-files

test:
py.test test -v
py.test test -v

coverage:
coverage run -m pytest test -v
coverage report
166 changes: 166 additions & 0 deletions nameko_grpc/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
import queue
import socket
from logging import getLogger
from urllib.parse import urlparse

import eventlet

from nameko_grpc.connection import ClientConnectionManager, ServerConnectionManager


log = getLogger(__name__)

CONNECT_TIMEOUT = 5


class ClientConnectionPool:
""" Simple connection pool for clients.
Accepts a list of targets and will maintain a connection to each of them,
round-robining requests between them.
Currently expects each target to be a valid argument to `urllib.parse.urlparse`.
TODO should be accepting something more strict, something like:
target:
hostname: for ssl verification
ip_address:
port:
service config?
"""

def __init__(self, targets, ssl, spawn_thread):
self.targets = targets
self.ssl = ssl
self.spawn_thread = spawn_thread

self.connections = queue.Queue()

def connect(self, target):
sock = socket.create_connection(
(target.hostname, target.port or 50051), timeout=CONNECT_TIMEOUT
)

if self.ssl:
context = self.ssl.client_context()
sock = context.wrap_socket(
sock=sock, server_hostname=target.hostname, suppress_ragged_eofs=True
)

sock.settimeout(60) # XXX needed and/or correct value?
connection = ClientConnectionManager(sock)
self.connections.put(connection)

def run_with_reconnect():
connection.run_forever()
if self.run:
self.connect(target)

self.spawn_thread(
target=run_with_reconnect, name=f"grpc client connection [{target}]"
)

def get(self):
while True:
conn = self.connections.get()
if conn.alive:
self.connections.put(conn)
return conn

def start(self):
self.run = True
for target in self.targets:
self.connect(urlparse(target))

def stop(self):
self.run = False
while not self.connections.empty():
self.connections.get().stop()


class ClientChannel:
""" Simple client channel.
Channels could eventually suppport pluggable resolvers and load-balancing.
"""

def __init__(self, target, ssl, spawn_thread):
self.conn_pool = ClientConnectionPool([target], ssl, spawn_thread)

def start(self):
self.conn_pool.start()

def stop(self):
self.conn_pool.stop()

def send_request(self, request_headers):
return self.conn_pool.get().send_request(request_headers)


class ServerConnectionPool:
""" Simple connection pool for servers.
Just accepts new connections and allows them to run until close.
"""

def __init__(self, host, port, ssl, spawn_thread, handle_request):
self.host = host
self.port = port
self.ssl = ssl
self.spawn_thread = spawn_thread
self.handle_request = handle_request

self.connections = queue.Queue()

def listen(self):
sock = eventlet.listen((self.host, self.port))
sock.settimeout(None)

if self.ssl:
context = self.ssl.server_context()
sock = context.wrap_socket(
sock=sock, server_side=True, suppress_ragged_eofs=True,
)

return sock

def run(self):
while self.is_accepting:
sock, _ = self.listening_socket.accept()
sock.settimeout(60) # XXX needed and/or correct value?

connection = ServerConnectionManager(sock, self.handle_request)
self.connections.put(connection)
self.spawn_thread(
target=connection.run_forever, name=f"grpc server connection [{sock}]"
)

def start(self):
self.listening_socket = self.listen()
self.is_accepting = True
self.spawn_thread(
target=self.run, name=f"grpc server accept [{self.listening_socket}]"
)

def stop(self):
self.is_accepting = False
while not self.connections.empty():
self.connections.get().stop()
self.listening_socket.close()


class ServerChannel:
""" Simple server channel encapsulating incoming connection management.
"""

def __init__(self, host, port, ssl, spawn_thread, handle_request):
self.conn_pool = ServerConnectionPool(
host, port, ssl, spawn_thread, handle_request
)

def start(self):
self.conn_pool.start()

def stop(self):
self.conn_pool.stop()
Loading

0 comments on commit d4d13dc

Please sign in to comment.