Skip to content

Commit

Permalink
Is207/reverse proxy webserver (#318)
Browse files Browse the repository at this point in the history
- 1st version of the reverse proxy subsystem (see ``src/simcore_service_webserver/reverse_proxy``)
   - configurable upon setup
   - well decoupled
   - unit tests in ``tests/unit/test_reverse_proxy.py``
- customized handlers : default, jupyter and preview (drafts)
- still not integrated w/  other subsystems in webserver (will do in separated pull-request)
- connected to #207
  • Loading branch information
pcrespov authored Nov 13, 2018
1 parent 04fa857 commit a6f9444
Show file tree
Hide file tree
Showing 14 changed files with 888 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __registry_request(path, method="GET"):
# r = s.get(api_url, verify=False) #getattr(s, method.lower())(api_url)
request_result = getattr(_SESSION, method.lower())(api_url)
_logger.info("Request status: %s",request_result.status_code)
if request_result.status_code > 399:
if request_result.status_code > 399:
request_result.raise_for_status()

return request_result
Expand Down
41 changes: 41 additions & 0 deletions services/web/server/src/simcore_service_webserver/director.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
""" director - subsystem that communicates with director service
"""

import logging

from aiohttp import web

from . import director_config

logger = logging.getLogger(__name__)


# SETTINGS ----------------------------------------------------
THIS_MODULE_NAME = __name__.split(".")[-1]

# --------------------------------------------------------------



def setup(app: web.Application):
"""Setup the directory sub-system in the application a la aiohttp fashion
"""
logger.debug("Setting up %s ...", __name__)

_cfg = director_config.get_from(app)

# TODO: create instance of director's client-sdk

# TODO: inject in application




# alias
setup_director = setup

__all__ = (
'setup_director'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
""" director - subsystem's configuration
- defines schema for this subsystem's section in configuration file
- helpers functions to get/set configuration from app configuration
TODO: add validation, get/set app config
"""
from typing import Dict

import trafaret as T
from aiohttp import web

from .application_keys import APP_CONFIG_KEY


THIS_SERVICE_NAME = 'director'


schema = T.Dict({
T.Key("host", default=THIS_SERVICE_NAME): T.String(),
"port": T.Int()
})


def get_from(app: web.Application) -> Dict:
""" Gets section from application's config
"""
return app[APP_CONFIG_KEY][THIS_SERVICE_NAME]



# alias
DIRECTOR_SERVICE = THIS_SERVICE_NAME
director_schema = schema


__all__ = (
"DIRECTOR_SERVICE",
"director_schema"
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" Manages lifespan of interactive services.
- uses director's client-sdk to communicate with the director service
"""
# pylint: disable=W0703
# pylint: disable=C0111
Expand All @@ -20,6 +22,10 @@ def session_connect(session_id):


async def session_disconnected(session_id):
""" Stops all running services when session disconnects
TODO: rename on_session_disconnected because is a reaction to that event
"""
log.debug("Session disconnection of session %s", session_id)
try:
director = director_sdk.get_director()
Expand Down Expand Up @@ -50,6 +56,12 @@ async def retrieve_list_of_services():


async def start_service(session_id, service_key, service_uuid, service_version=None):
""" Starts a service registered in the container's registry
:param str service_key: The key (url) of the service (required)
:param str service_uuid: The uuid to assign the service with (required)
:param str service_version: The tag/version of the service
"""
if not service_version:
service_version = "latest"
log.debug("Starting service %s:%s with uuid %s", service_key, service_version, service_uuid)
Expand All @@ -68,6 +80,10 @@ async def start_service(session_id, service_key, service_uuid, service_version=N


async def stop_service(session_id, service_uuid):
""" Stops and removes a running service
:param str service_uuid: The uuid to assign the service with (required)
"""
log.debug("Stopping service with uuid %s", service_uuid)
try:
director = director_sdk.get_director()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
""" reverse proxy subsystem
Dynamically reroutes communication between web-server client and dynamic-backend services (or dyb's)
Use case
- All requests to `/x/{serviceId}/{proxyPath}` are re-routed to resolved dyb service
- dy-services are managed by the director service who monitors and controls its lifetime
- a client-sdk to query the director is passed upon setup
- Customized reverse proxy handlers for dy-jupyter, dy-modeling and dy-3dvis
"""
import logging

from aiohttp import web

from .abc import ServiceResolutionPolicy
from .routing import ReverseChooser
from .handlers import jupyter, paraview
from .settings import URL_PATH

logger = logging.getLogger(__name__)

MODULE_NAME = __name__.split(".")[-1]


def setup(app: web.Application, service_resolver: ServiceResolutionPolicy):
"""Sets up reverse-proxy subsystem in the application (a la aiohttp)
"""
logger.debug("Setting up %s ...", __name__)

chooser = ReverseChooser(resolver=service_resolver)

# Registers reverse proxy handlers customized for specific service types
chooser.register_handler(jupyter.handler,
image_name=jupyter.SUPPORTED_IMAGE_NAME)

chooser.register_handler(paraview.handler,
image_name=paraview.SUPPORTED_IMAGE_NAME)

# /x/{serviceId}/{proxyPath:.*}
app.router.add_route(method='*', path=URL_PATH,
handler=chooser.do_route, name=MODULE_NAME)

# chooser has same lifetime as the application
app[__name__] = {"chooser": chooser}


# alias
setup_reverse_proxy = setup

__all__ = (
'setup_reverse_proxy'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

import abc

from yarl import URL

from .settings import PROXY_MOUNTPOINT


class ServiceResolutionPolicy(metaclass=abc.ABCMeta):
""" Implements an interface to identify and
resolve the location of a dynamic backend service
"""
base_mountpoint = PROXY_MOUNTPOINT

@abc.abstractmethod
async def get_image_name(self, service_identifier: str) -> str:
"""
Identifies a type of service. This normally corresponds
to the name of the docker image
"""
pass

@abc.abstractmethod
async def find_url(self, service_identifier: str) -> URL:
"""
Return the complete url (including the mountpoint) of
the service in the backend
This access should be accesible by the proxy server
E.g. 'http://127.0.0.1:58873/x/ae1q8/'
"""
pass

# TODO: on_closed signal to notify sub-system that the service
# has closed and can raise HTTPServiceAnavailable
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
""" Handlers customized for services
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
""" Default implementation of reverse-proxy
TODO: https://stackoverflow.com/questions/46788964/trying-to-build-a-proxy-with-aiohttp
TODO: https://github.com/weargoggles/aioproxy/blob/master/aioproxy.py
- another possibility: always request director and thisone will redirect to real server...
CONS: will double #calls
PROS: location of the dyb service can change at will!
"""
import logging
import time

import aiohttp
from aiohttp import web

from yarl import URL
logger = logging.getLogger(__name__)


CHUNK = 32768


async def handler(request: web.Request, service_url: str, **_kwargs) -> web.StreamResponse:
# FIXME: Taken tmp from https://github.com/weargoggles/aioproxy/blob/master/aioproxy.py
start = time.time()
try:
# FIXME: service_url should be service_endpoint or service_origins
tarfind_url = URL(service_url).origin().with_path(
request.path).with_query(request.query)
async with aiohttp.client.request(
request.method, tarfind_url,
headers=request.headers,
chunked=CHUNK,
# response_class=ReverseProxyResponse,
) as r:
logger.debug('opened backend request in %d ms',
((time.time() - start) * 1000))
response = aiohttp.web.StreamResponse(status=r.status,
headers=r.headers)
await response.prepare(request)
content = r.content
while True:
chunk = await content.read(CHUNK)
if not chunk:
break
await response.write(chunk)

logger.debug('finished sending content in %d ms',
((time.time() - start) * 1000,))
await response.write_eof()
return response
except Exception:
logger.debug("reverse proxy %s", request, exec_info=True)
raise web.HTTPServiceUnavailable(reason="Cannot talk to spawner",
content_type="application/json")

# except web.HttpStatus as status:
# return status.as_response()
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
""" Reverse-proxy customized for jupyter notebooks
TODO: document
"""

import asyncio
import logging
import pprint

import aiohttp
from aiohttp import client, web

# TODO: find actual name in registry
SUPPORTED_IMAGE_NAME = "jupyter"
SUPPORTED_IMAGE_TAG = "==0.1.0"

logger = logging.getLogger(__name__)


async def handler(req: web.Request, service_url: str, **_kwargs) -> web.StreamResponse:
# Resolved url pointing to backend jupyter service
tarfind_url = service_url + req.path_qs

reqH = req.headers.copy()
if reqH['connection'] == 'Upgrade' and reqH['upgrade'] == 'websocket' and req.method == 'GET':

ws_server = web.WebSocketResponse()
await ws_server.prepare(req)
logger.info('##### WS_SERVER %s', pprint.pformat(ws_server))

client_session = aiohttp.ClientSession(cookies=req.cookies)
async with client_session.ws_connect(
tarfind_url,
) as ws_client:
logger.info('##### WS_CLIENT %s', pprint.pformat(ws_client))

async def ws_forward(ws_from, ws_to):
async for msg in ws_from:
logger.info('>>> msg: %s', pprint.pformat(msg))
mt = msg.type
md = msg.data
if mt == aiohttp.WSMsgType.TEXT:
await ws_to.send_str(md)
elif mt == aiohttp.WSMsgType.BINARY:
await ws_to.send_bytes(md)
elif mt == aiohttp.WSMsgType.PING:
await ws_to.ping()
elif mt == aiohttp.WSMsgType.PONG:
await ws_to.pong()
elif ws_to.closed:
await ws_to.close(code=ws_to.close_code, message=msg.extra)
else:
raise ValueError(
'unexpected message type: %s' % pprint.pformat(msg))

await asyncio.wait([ws_forward(ws_server, ws_client), ws_forward(ws_client, ws_server)], return_when=asyncio.FIRST_COMPLETED)

return ws_server
else:

async with client.request(
req.method, tarfind_url,
headers=reqH,
allow_redirects=False,
data=await req.read()
) as res:
headers = res.headers.copy()
body = await res.read()
return web.Response(
headers=headers,
status=res.status,
body=body
)
return ws_server


if __name__ == "__main__":
# dummies for manual testing
BASE_URL = 'http://0.0.0.0:8888'
MOUNT_POINT = '/x/fakeUuid'

def adapter(req: web.Request):
return handler(req, service_url=BASE_URL)

app = web.Application()
app.router.add_route('*', MOUNT_POINT + '/{proxyPath:.*}', adapter)
web.run_app(app, port=3984)
Loading

0 comments on commit a6f9444

Please sign in to comment.