Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is207/reverse proxy webserver #318

Merged
merged 56 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2741834
Fixes typos in client after bad merge
Jul 6, 2018
9646035
Merge remote-tracking branch 'upstream/master'
Jul 6, 2018
c3ad81c
Merge remote-tracking branch 'upstream/master'
Jul 6, 2018
d7b91f7
Merge remote-tracking branch 'upstream/master'
Jul 10, 2018
a65aa8e
Merge remote-tracking branch 'upstream/master'
Jul 11, 2018
4edc12a
Merge remote-tracking branch 'upstream/master'
Jul 16, 2018
460bdea
Merge remote-tracking branch 'upstream/master'
Jul 16, 2018
408990d
Merge remote-tracking branch 'upstream/master'
Jul 18, 2018
ee1b91b
Merge remote-tracking branch 'upstream/master'
Jul 20, 2018
cf7b3f4
Merge remote-tracking branch 'upstream/master'
Aug 9, 2018
3f4eed8
Merge remote-tracking branch 'upstream/master'
Aug 22, 2018
14cbec2
Merge remote-tracking branch 'upstream/master'
Aug 23, 2018
b57188b
Merge remote-tracking branch 'upstream/master'
Sep 3, 2018
66e5cdc
Merge remote-tracking branch 'upstream/master'
Sep 4, 2018
31aeb8f
Merge remote-tracking branch 'upstream/master'
Sep 13, 2018
c0dbe36
Merge remote-tracking branch 'upstream/master'
Sep 18, 2018
81111e0
Merge remote-tracking branch 'upstream/master'
Sep 19, 2018
6b4c1bf
Merge remote-tracking branch 'upstream/master'
Sep 24, 2018
bbe7558
Merge remote-tracking branch 'upstream/master'
Sep 25, 2018
fd2befe
Merge remote-tracking branch 'upstream/master'
Oct 10, 2018
6b0dd8f
Merge remote-tracking branch 'upstream/master'
Oct 12, 2018
b706243
Merge remote-tracking branch 'upstream/master'
Oct 17, 2018
0bf32b8
Merge remote-tracking branch 'upstream/master'
Oct 17, 2018
d0664c9
Merge remote-tracking branch 'upstream/master'
Oct 19, 2018
5e895e0
Merge remote-tracking branch 'upstream/master'
Oct 19, 2018
90eb8f4
Merge remote-tracking branch 'upstream/master'
Oct 19, 2018
dc90291
Merge remote-tracking branch 'upstream/master'
Oct 19, 2018
152d2cc
Merge remote-tracking branch 'upstream/master'
pcrespov Oct 22, 2018
4628a6e
Merge remote-tracking branch 'upstream/master'
pcrespov Oct 23, 2018
9c49147
Merge remote-tracking branch 'upstream/master'
Oct 24, 2018
21aeab8
Merge branch 'master' of github.com:pcrespov/osparc-simcore
Oct 24, 2018
62eb5c5
Merge remote-tracking branch 'upstream/master'
Oct 26, 2018
65f510c
Merge remote-tracking branch 'upstream/master'
Oct 29, 2018
9e40d37
Merge remote-tracking branch 'upstream/master'
Oct 30, 2018
77d6ac1
Merge remote-tracking branch 'upstream/master'
pcrespov Oct 31, 2018
e733846
Merge branch 'master' of github.com:pcrespov/osparc-simcore
Oct 31, 2018
6ca6aad
Merge remote-tracking branch 'upstream/master'
Oct 31, 2018
d423dcc
Merge remote-tracking branch 'upstream/master'
Nov 2, 2018
5e1969e
WIP
Nov 2, 2018
0fd2975
Merge remote-tracking branch 'upstream/master'
Nov 5, 2018
4a8c093
Merge remote-tracking branch 'upstream/master'
Nov 7, 2018
d3ad6e5
Merge branch 'master' into is207/reverse-proxy-webserver
Nov 7, 2018
fcd970f
Minor clenaup
Nov 7, 2018
00a715d
Adds a director package to setup/config subsystem
Nov 7, 2018
c21cadb
Added first draft of reserve-proxy sub-system
Nov 7, 2018
c1a4742
Simplified reverse proxy interface:
Nov 8, 2018
ee1981a
spawmer service test pass
Nov 8, 2018
5064dd8
Created named resource for reverse_proxy
Nov 8, 2018
6b0d357
tests calling spawmer server throw reverse proxy server
Nov 8, 2018
671b0b7
tests client communicating with spawner and swapned servers via the r…
Nov 8, 2018
af48ccb
Merge branch 'master' into is207/reverse-proxy-webserver
pcrespov Nov 8, 2018
bd32f40
Removed unused file
pcrespov Nov 8, 2018
2a00149
Fixed caching
pcrespov Nov 8, 2018
777e5be
Fixed typo in app key
pcrespov Nov 8, 2018
e0f9ef0
Merge branch 'master' into is207/reverse-proxy-webserver
pcrespov Nov 13, 2018
0684faf
Merge branch 'master' into is207/reverse-proxy-webserver
sanderegg Nov 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __registry_request(path, method="GET"):
# r = s.get(api_url, verify=False) #getattr(s, method.lower())(api_url)
request_result = getattr(_SESSION, method.lower())(api_url)
_logger.info("Request status: %s",request_result.status_code)
if request_result.status_code > 399:
if request_result.status_code > 399:
request_result.raise_for_status()

return request_result
Expand Down
41 changes: 41 additions & 0 deletions services/web/server/src/simcore_service_webserver/director.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
""" director - subsystem that communicates with director service

"""

import logging

from aiohttp import web

from . import director_config

logger = logging.getLogger(__name__)


# SETTINGS ----------------------------------------------------
THIS_MODULE_NAME = __name__.split(".")[-1]

# --------------------------------------------------------------



def setup(app: web.Application):
"""Setup the directory sub-system in the application a la aiohttp fashion

"""
logger.debug("Setting up %s ...", __name__)

_cfg = director_config.get_from(app)

# TODO: create instance of director's client-sdk

# TODO: inject in application




# alias
setup_director = setup

__all__ = (
'setup_director'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
""" director - subsystem's configuration

- defines schema for this subsystem's section in configuration file
- helpers functions to get/set configuration from app configuration

TODO: add validation, get/set app config
"""
from typing import Dict

import trafaret as T
from aiohttp import web

from .application_keys import APP_CONFIG_KEY


THIS_SERVICE_NAME = 'director'


schema = T.Dict({
T.Key("host", default=THIS_SERVICE_NAME): T.String(),
"port": T.Int()
})


def get_from(app: web.Application) -> Dict:
""" Gets section from application's config

"""
return app[APP_CONFIG_KEY][THIS_SERVICE_NAME]



# alias
DIRECTOR_SERVICE = THIS_SERVICE_NAME
director_schema = schema


__all__ = (
"DIRECTOR_SERVICE",
"director_schema"
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" Manages lifespan of interactive services.

- uses director's client-sdk to communicate with the director service

"""
# pylint: disable=W0703
# pylint: disable=C0111
Expand All @@ -20,6 +22,10 @@ def session_connect(session_id):


async def session_disconnected(session_id):
""" Stops all running services when session disconnects

TODO: rename on_session_disconnected because is a reaction to that event
"""
log.debug("Session disconnection of session %s", session_id)
try:
director = director_sdk.get_director()
Expand Down Expand Up @@ -50,6 +56,12 @@ async def retrieve_list_of_services():


async def start_service(session_id, service_key, service_uuid, service_version=None):
""" Starts a service registered in the container's registry

:param str service_key: The key (url) of the service (required)
:param str service_uuid: The uuid to assign the service with (required)
:param str service_version: The tag/version of the service
"""
if not service_version:
service_version = "latest"
log.debug("Starting service %s:%s with uuid %s", service_key, service_version, service_uuid)
Expand All @@ -68,6 +80,10 @@ async def start_service(session_id, service_key, service_uuid, service_version=N


async def stop_service(session_id, service_uuid):
""" Stops and removes a running service

:param str service_uuid: The uuid to assign the service with (required)
"""
log.debug("Stopping service with uuid %s", service_uuid)
try:
director = director_sdk.get_director()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
""" reverse proxy subsystem

Dynamically reroutes communication between web-server client and dynamic-backend services (or dyb's)

Use case
- All requests to `/x/{serviceId}/{proxyPath}` are re-routed to resolved dyb service
- dy-services are managed by the director service who monitors and controls its lifetime
- a client-sdk to query the director is passed upon setup
- Customized reverse proxy handlers for dy-jupyter, dy-modeling and dy-3dvis

"""
import logging

from aiohttp import web

from .abc import ServiceResolutionPolicy
from .routing import ReverseChooser
from .handlers import jupyter, paraview
from .settings import URL_PATH

logger = logging.getLogger(__name__)

MODULE_NAME = __name__.split(".")[-1]


def setup(app: web.Application, service_resolver: ServiceResolutionPolicy):
"""Sets up reverse-proxy subsystem in the application (a la aiohttp)

"""
logger.debug("Setting up %s ...", __name__)

chooser = ReverseChooser(resolver=service_resolver)

# Registers reverse proxy handlers customized for specific service types
chooser.register_handler(jupyter.handler,
image_name=jupyter.SUPPORTED_IMAGE_NAME)

chooser.register_handler(paraview.handler,
image_name=paraview.SUPPORTED_IMAGE_NAME)

# /x/{serviceId}/{proxyPath:.*}
app.router.add_route(method='*', path=URL_PATH,
handler=chooser.do_route, name=MODULE_NAME)

# chooser has same lifetime as the application
app[__name__] = {"chooser": chooser}


# alias
setup_reverse_proxy = setup

__all__ = (
'setup_reverse_proxy'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

import abc

from yarl import URL

from .settings import PROXY_MOUNTPOINT


class ServiceResolutionPolicy(metaclass=abc.ABCMeta):
""" Implements an interface to identify and
resolve the location of a dynamic backend service
"""
base_mountpoint = PROXY_MOUNTPOINT

@abc.abstractmethod
async def get_image_name(self, service_identifier: str) -> str:
"""
Identifies a type of service. This normally corresponds
to the name of the docker image
"""
pass

@abc.abstractmethod
async def find_url(self, service_identifier: str) -> URL:
"""
Return the complete url (including the mountpoint) of
the service in the backend

This access should be accesible by the proxy server

E.g. 'http://127.0.0.1:58873/x/ae1q8/'
"""
pass

# TODO: on_closed signal to notify sub-system that the service
# has closed and can raise HTTPServiceAnavailable
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
""" Handlers customized for services

"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
""" Default implementation of reverse-proxy

TODO: https://stackoverflow.com/questions/46788964/trying-to-build-a-proxy-with-aiohttp
TODO: https://github.com/weargoggles/aioproxy/blob/master/aioproxy.py

- another possibility: always request director and thisone will redirect to real server...
CONS: will double #calls
PROS: location of the dyb service can change at will!
"""
import logging
import time

import aiohttp
from aiohttp import web

from yarl import URL
logger = logging.getLogger(__name__)


CHUNK = 32768


async def handler(request: web.Request, service_url: str, **_kwargs) -> web.StreamResponse:
# FIXME: Taken tmp from https://github.com/weargoggles/aioproxy/blob/master/aioproxy.py
start = time.time()
try:
# FIXME: service_url should be service_endpoint or service_origins
tarfind_url = URL(service_url).origin().with_path(
request.path).with_query(request.query)
async with aiohttp.client.request(
request.method, tarfind_url,
headers=request.headers,
chunked=CHUNK,
# response_class=ReverseProxyResponse,
) as r:
logger.debug('opened backend request in %d ms',
((time.time() - start) * 1000))
response = aiohttp.web.StreamResponse(status=r.status,
headers=r.headers)
await response.prepare(request)
content = r.content
while True:
chunk = await content.read(CHUNK)
if not chunk:
break
await response.write(chunk)

logger.debug('finished sending content in %d ms',
((time.time() - start) * 1000,))
await response.write_eof()
return response
except Exception:
logger.debug("reverse proxy %s", request, exec_info=True)
raise web.HTTPServiceUnavailable(reason="Cannot talk to spawner",
content_type="application/json")

# except web.HttpStatus as status:
# return status.as_response()
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
""" Reverse-proxy customized for jupyter notebooks

TODO: document
"""

import asyncio
import logging
import pprint

import aiohttp
from aiohttp import client, web

# TODO: find actual name in registry
SUPPORTED_IMAGE_NAME = "jupyter"
SUPPORTED_IMAGE_TAG = "==0.1.0"

logger = logging.getLogger(__name__)


async def handler(req: web.Request, service_url: str, **_kwargs) -> web.StreamResponse:
# Resolved url pointing to backend jupyter service
tarfind_url = service_url + req.path_qs

reqH = req.headers.copy()
if reqH['connection'] == 'Upgrade' and reqH['upgrade'] == 'websocket' and req.method == 'GET':

ws_server = web.WebSocketResponse()
await ws_server.prepare(req)
logger.info('##### WS_SERVER %s', pprint.pformat(ws_server))

client_session = aiohttp.ClientSession(cookies=req.cookies)
async with client_session.ws_connect(
tarfind_url,
) as ws_client:
logger.info('##### WS_CLIENT %s', pprint.pformat(ws_client))

async def ws_forward(ws_from, ws_to):
async for msg in ws_from:
logger.info('>>> msg: %s', pprint.pformat(msg))
mt = msg.type
md = msg.data
if mt == aiohttp.WSMsgType.TEXT:
await ws_to.send_str(md)
elif mt == aiohttp.WSMsgType.BINARY:
await ws_to.send_bytes(md)
elif mt == aiohttp.WSMsgType.PING:
await ws_to.ping()
elif mt == aiohttp.WSMsgType.PONG:
await ws_to.pong()
elif ws_to.closed:
await ws_to.close(code=ws_to.close_code, message=msg.extra)
else:
raise ValueError(
'unexpected message type: %s' % pprint.pformat(msg))

await asyncio.wait([ws_forward(ws_server, ws_client), ws_forward(ws_client, ws_server)], return_when=asyncio.FIRST_COMPLETED)

return ws_server
else:

async with client.request(
req.method, tarfind_url,
headers=reqH,
allow_redirects=False,
data=await req.read()
) as res:
headers = res.headers.copy()
body = await res.read()
return web.Response(
headers=headers,
status=res.status,
body=body
)
return ws_server


if __name__ == "__main__":
# dummies for manual testing
BASE_URL = 'http://0.0.0.0:8888'
MOUNT_POINT = '/x/fakeUuid'

def adapter(req: web.Request):
return handler(req, service_url=BASE_URL)

app = web.Application()
app.router.add_route('*', MOUNT_POINT + '/{proxyPath:.*}', adapter)
web.run_app(app, port=3984)
Loading